accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vi...@apache.org
Subject svn commit: r1496226 [9/13] - in /accumulo/branches/ACCUMULO-CURATOR: ./ assemble/ conf/examples/1GB/native-standalone/ conf/examples/1GB/standalone/ conf/examples/2GB/native-standalone/ conf/examples/2GB/standalone/ conf/examples/3GB/native-standalone...
Date Mon, 24 Jun 2013 21:34:25 GMT
Modified: accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java?rev=1496226&r1=1496225&r2=1496226&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java (original)
+++ accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java Mon Jun 24 21:34:20 2013
@@ -18,7 +18,6 @@ package org.apache.accumulo.server.table
 
 import static org.apache.accumulo.server.problems.ProblemType.TABLET_LOAD;
 
-import java.io.EOFException;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -46,7 +45,6 @@ import java.util.SortedSet;
 import java.util.TimerTask;
 import java.util.TreeMap;
 import java.util.TreeSet;
-import java.util.UUID;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.CancellationException;
@@ -97,7 +95,6 @@ import org.apache.accumulo.core.data.thr
 import org.apache.accumulo.core.data.thrift.TMutation;
 import org.apache.accumulo.core.data.thrift.TRange;
 import org.apache.accumulo.core.data.thrift.UpdateErrors;
-import org.apache.accumulo.core.file.FileUtil;
 import org.apache.accumulo.core.iterators.IterationInterruptedException;
 import org.apache.accumulo.core.master.thrift.Compacting;
 import org.apache.accumulo.core.master.thrift.MasterClientService;
@@ -120,11 +117,11 @@ import org.apache.accumulo.core.tabletse
 import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
 import org.apache.accumulo.core.util.AddressUtil;
 import org.apache.accumulo.core.util.ByteBufferUtil;
-import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.accumulo.core.util.ColumnFQ;
 import org.apache.accumulo.core.util.Daemon;
 import org.apache.accumulo.core.util.LoggingRunnable;
 import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.util.RootTable;
 import org.apache.accumulo.core.util.ServerServices;
 import org.apache.accumulo.core.util.ServerServices.Service;
 import org.apache.accumulo.core.util.SimpleThreadPool;
@@ -143,8 +140,9 @@ import org.apache.accumulo.server.client
 import org.apache.accumulo.server.conf.ServerConfiguration;
 import org.apache.accumulo.server.conf.TableConfiguration;
 import org.apache.accumulo.server.data.ServerMutation;
-import org.apache.accumulo.server.logger.LogFileKey;
-import org.apache.accumulo.server.logger.LogFileValue;
+import org.apache.accumulo.server.fs.FileRef;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
 import org.apache.accumulo.server.master.state.Assignment;
 import org.apache.accumulo.server.master.state.DistributedStoreException;
 import org.apache.accumulo.server.master.state.TServerInstance;
@@ -182,7 +180,6 @@ import org.apache.accumulo.server.tablet
 import org.apache.accumulo.server.tabletserver.metrics.TabletServerMinCMetrics;
 import org.apache.accumulo.server.tabletserver.metrics.TabletServerScanMetrics;
 import org.apache.accumulo.server.tabletserver.metrics.TabletServerUpdateMetrics;
-import org.apache.accumulo.server.trace.TraceFileSystem;
 import org.apache.accumulo.server.util.FileSystemMonitor;
 import org.apache.accumulo.server.util.Halt;
 import org.apache.accumulo.server.util.MapCounter;
@@ -205,14 +202,8 @@ import org.apache.accumulo.trace.instrum
 import org.apache.accumulo.trace.instrument.thrift.TraceWrap;
 import org.apache.accumulo.trace.thrift.TInfo;
 import org.apache.commons.collections.map.LRUMap;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.Trash;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.Reader;
 import org.apache.hadoop.io.Text;
 import org.apache.log4j.Logger;
 import org.apache.thrift.TException;
@@ -242,11 +233,11 @@ public class TabletServer extends Abstra
   private ServerConfiguration serverConfig;
   private LogSorter logSorter = null;
   
-  public TabletServer(ServerConfiguration conf, FileSystem fs) {
+  public TabletServer(ServerConfiguration conf, VolumeManager fs) {
     super();
     this.serverConfig = conf;
     this.instance = conf.getInstance();
-    this.fs = TraceFileSystem.wrap(fs);
+    this.fs = fs;
     this.logSorter = new LogSorter(instance, fs, getSystemConfiguration());
     SimpleTimer.getInstance().schedule(new Runnable() {
       @Override
@@ -890,6 +881,13 @@ public class TabletServer extends Abstra
       for (Entry<TKeyExtent,Map<String,MapFileInfo>> entry : files.entrySet()) {
         TKeyExtent tke = entry.getKey();
         Map<String,MapFileInfo> fileMap = entry.getValue();
+        Map<FileRef,MapFileInfo> fileRefMap = new HashMap<FileRef,MapFileInfo>();
+        for (Entry<String,MapFileInfo> mapping : fileMap.entrySet()) {
+          Path path = new Path(mapping.getKey());
+          FileSystem ns = fs.getFileSystemByPath(path);
+          path = ns.makeQualified(path);
+          fileRefMap.put(new FileRef(path.toString(), path), mapping.getValue());
+        }
         
         Tablet importTablet = onlineTablets.get(new KeyExtent(tke));
         
@@ -897,7 +895,7 @@ public class TabletServer extends Abstra
           failures.add(tke);
         } else {
           try {
-            importTablet.importMapFiles(tid, fileMap, setTime);
+            importTablet.importMapFiles(tid, fileRefMap, setTime);
           } catch (IOException ioe) {
             log.info("files " + fileMap.keySet() + " not imported to " + new KeyExtent(tke) + ": " + ioe.getMessage());
             failures.add(tke);
@@ -1277,8 +1275,8 @@ public class TabletServer extends Abstra
         } else if (keyExtent.isRootTablet()) {
           throw new IllegalArgumentException("Cannot batch query root tablet with other tablets " + threadPoolExtent + " " + keyExtent);
         } else if (keyExtent.isMeta() && !threadPoolExtent.isMeta()) {
-          throw new IllegalArgumentException("Cannot batch query " + Constants.METADATA_TABLE_NAME + " and non " + Constants.METADATA_TABLE_NAME + " tablets "
-              + threadPoolExtent + " " + keyExtent);
+          throw new IllegalArgumentException("Cannot batch query " + MetadataTable.NAME + " and non " + MetadataTable.NAME + " tablets " + threadPoolExtent
+              + " " + keyExtent);
         }
         
       }
@@ -2051,20 +2049,14 @@ public class TabletServer extends Abstra
       
     }
     
-    /*
-     * (non-Javadoc)
-     * 
-     * @see org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface#removeLogs(org.apache.accumulo.trace.thrift.TInfo,
-     * org.apache.accumulo.core.security.thrift.Credentials, java.util.List)
-     */
     @Override
     public void removeLogs(TInfo tinfo, TCredentials credentials, List<String> filenames) throws TException {
       String myname = getClientAddressString();
       myname = myname.replace(':', '+');
-      Path logDir = new Path(Constants.getWalDirectory(acuConf), myname);
       Set<String> loggers = new HashSet<String>();
       logger.getLoggers(loggers);
       nextFile: for (String filename : filenames) {
+        // skip any log we're currently using
         for (String logger : loggers) {
           if (logger.contains(filename))
             continue nextFile;
@@ -2081,29 +2073,30 @@ public class TabletServer extends Abstra
             }
           }
         }
+        
         try {
-          String source = logDir + "/" + filename;
+          Path source = new Path(filename);
           if (acuConf.getBoolean(Property.TSERV_ARCHIVE_WALOGS)) {
-            String walogArchive = Constants.getBaseDir(acuConf) + "/walogArchive";
-            fs.mkdirs(new Path(walogArchive));
-            String dest = walogArchive + "/" + filename;
+            Path walogArchive = fs.matchingFileSystem(source, ServerConstants.getWalogArchives());
+            fs.mkdirs(walogArchive);
+            Path dest = new Path(walogArchive, source.getName());
             log.info("Archiving walog " + source + " to " + dest);
-            if (!fs.rename(new Path(source), new Path(dest)))
+            if (!fs.rename(source, dest))
               log.error("rename is unsuccessful");
           } else {
             log.info("Deleting walog " + filename);
-            Trash trash = new Trash(fs, fs.getConf());
-            Path sourcePath = new Path(source);
-            if (!trash.moveToTrash(sourcePath) && !fs.delete(sourcePath, true))
+            Path sourcePath = new Path(filename);
+            if (!fs.moveToTrash(sourcePath) && !fs.deleteRecursively(sourcePath))
               log.warn("Failed to delete walog " + source);
-            Path recoveryPath = new Path(Constants.getRecoveryDir(acuConf), filename);
-            try {
-              if (trash.moveToTrash(recoveryPath) || fs.delete(recoveryPath, true))
-                log.info("Deleted any recovery log " + filename);
-            } catch (FileNotFoundException ex) {
-              // ignore
+            for (String recovery : ServerConstants.getRecoveryDirs()) {
+              Path recoveryPath = new Path(recovery, source.getName());
+              try {
+                if (fs.moveToTrash(recoveryPath) || fs.deleteRecursively(recoveryPath))
+                  log.info("Deleted any recovery log " + filename);
+              } catch (FileNotFoundException ex) {
+                // ignore
+              }
             }
-            
           }
         } catch (IOException e) {
           log.warn("Error attempting to delete write-ahead log " + filename + ": " + e);
@@ -2559,7 +2552,7 @@ public class TabletServer extends Abstra
     }
   }
   
-  private FileSystem fs;
+  private VolumeManager fs;
   private Instance instance;
   
   private final SortedMap<KeyExtent,Tablet> onlineTablets = Collections.synchronizedSortedMap(new TreeMap<KeyExtent,Tablet>());
@@ -2880,7 +2873,7 @@ public class TabletServer extends Abstra
       throw new AccumuloException("Root tablet already has a location set");
     }
     
-    return new Pair<Text,KeyExtent>(new Text(Constants.ZROOT_TABLET), null);
+    return new Pair<Text,KeyExtent>(new Text(RootTable.ZROOT_TABLET), null);
   }
   
   public static Pair<Text,KeyExtent> verifyTabletInformation(KeyExtent extent, TServerInstance instance, SortedMap<Key,Value> tabletsKeyValues,
@@ -2890,12 +2883,14 @@ public class TabletServer extends Abstra
     if (extent.isRootTablet()) {
       return verifyRootTablet(extent, instance);
     }
+    String tableToVerify = MetadataTable.ID;
+    if (extent.isMeta())
+      tableToVerify = RootTable.ID;
     
-    List<ColumnFQ> columnsToFetch = Arrays.asList(new ColumnFQ[] {Constants.METADATA_DIRECTORY_COLUMN, Constants.METADATA_PREV_ROW_COLUMN,
-        Constants.METADATA_SPLIT_RATIO_COLUMN, Constants.METADATA_OLD_PREV_ROW_COLUMN, Constants.METADATA_TIME_COLUMN});
+    List<ColumnFQ> columnsToFetch = Arrays.asList(new ColumnFQ[] {MetadataTable.DIRECTORY_COLUMN, MetadataTable.PREV_ROW_COLUMN,
+        MetadataTable.SPLIT_RATIO_COLUMN, MetadataTable.OLD_PREV_ROW_COLUMN, MetadataTable.TIME_COLUMN});
     
-    ScannerImpl scanner = new ScannerImpl(HdfsZooInstance.getInstance(), SecurityConstants.getSystemCredentials(), Constants.METADATA_TABLE_ID,
-        Constants.NO_AUTHS);
+    ScannerImpl scanner = new ScannerImpl(HdfsZooInstance.getInstance(), SecurityConstants.getSystemCredentials(), tableToVerify, Authorizations.EMPTY);
     scanner.setRange(extent.toMetadataRange());
     
     TreeMap<Key,Value> tkv = new TreeMap<Key,Value>();
@@ -2918,7 +2913,7 @@ public class TabletServer extends Abstra
     
     Value oldPrevEndRow = null;
     for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
-      if (Constants.METADATA_OLD_PREV_ROW_COLUMN.hasColumns(entry.getKey())) {
+      if (MetadataTable.OLD_PREV_ROW_COLUMN.hasColumns(entry.getKey())) {
         oldPrevEndRow = entry.getValue();
       }
     }
@@ -2927,7 +2922,13 @@ public class TabletServer extends Abstra
       SortedMap<Text,SortedMap<ColumnFQ,Value>> tabletEntries;
       tabletEntries = MetadataTable.getTabletEntries(tabletsKeyValues, columnsToFetch);
       
-      KeyExtent fke = MetadataTable.fixSplit(metadataEntry, tabletEntries.get(metadataEntry), instance, SecurityConstants.getSystemCredentials(), lock);
+      KeyExtent fke;
+      try {
+        fke = MetadataTable.fixSplit(metadataEntry, tabletEntries.get(metadataEntry), instance, SecurityConstants.getSystemCredentials(), lock);
+      } catch (IOException e) {
+        log.error("Error fixing split " + metadataEntry);
+        throw new AccumuloException(e.toString());
+      }
       
       if (!fke.equals(extent)) {
         return new Pair<Text,KeyExtent>(null, fke);
@@ -2955,19 +2956,19 @@ public class TabletServer extends Abstra
         return null;
       }
       Text cf = key.getColumnFamily();
-      if (cf.equals(Constants.METADATA_FUTURE_LOCATION_COLUMN_FAMILY)) {
+      if (cf.equals(MetadataTable.FUTURE_LOCATION_COLUMN_FAMILY)) {
         if (future != null) {
           throw new AccumuloException("Tablet has multiple future locations " + extent);
         }
         future = new TServerInstance(entry.getValue(), key.getColumnQualifier());
-      } else if (cf.equals(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY)) {
+      } else if (cf.equals(MetadataTable.CURRENT_LOCATION_COLUMN_FAMILY)) {
         log.info("Tablet seems to be already assigned to " + new TServerInstance(entry.getValue(), key.getColumnQualifier()));
         return null;
-      } else if (Constants.METADATA_PREV_ROW_COLUMN.hasColumns(key)) {
+      } else if (MetadataTable.PREV_ROW_COLUMN.hasColumns(key)) {
         prevEndRow = entry.getValue();
-      } else if (Constants.METADATA_DIRECTORY_COLUMN.hasColumns(key)) {
+      } else if (MetadataTable.DIRECTORY_COLUMN.hasColumns(key)) {
         dir = entry.getValue();
-      } else if (Constants.METADATA_TIME_COLUMN.hasColumns(key)) {
+      } else if (MetadataTable.TIME_COLUMN.hasColumns(key)) {
         time = entry.getValue();
       }
     }
@@ -3221,13 +3222,11 @@ public class TabletServer extends Abstra
   public static void main(String[] args) throws IOException {
     try {
       SecurityUtil.serverLogin();
-      FileSystem fs = FileUtil.getFileSystem(CachedConfiguration.getInstance(), ServerConfiguration.getSiteConfiguration());
+      VolumeManager fs = VolumeManagerImpl.get();
       String hostname = Accumulo.getLocalAddress(args);
       Instance instance = HdfsZooInstance.getInstance();
       ServerConfiguration conf = new ServerConfiguration(instance);
       Accumulo.init(fs, conf, "tserver");
-      ensureHdfsSyncIsEnabled(fs);
-      recoverLocalWriteAheadLogs(fs, conf);
       TabletServer server = new TabletServer(conf, fs);
       server.config(hostname);
       Accumulo.enableTracing(hostname, "tserver");
@@ -3237,91 +3236,6 @@ public class TabletServer extends Abstra
     }
   }
   
-  private static void ensureHdfsSyncIsEnabled(FileSystem fs) {
-    if (fs instanceof DistributedFileSystem) {
-      if (!fs.getConf().getBoolean("dfs.durable.sync", false) && !fs.getConf().getBoolean("dfs.support.append", false)) {
-        String msg = "Must set dfs.durable.sync OR dfs.support.append to true.  Which one needs to be set depends on your version of HDFS.  See ACCUMULO-623. \n"
-            + "HADOOP RELEASE          VERSION           SYNC NAME             DEFAULT\n"
-            + "Apache Hadoop           0.20.205          dfs.support.append    false\n"
-            + "Apache Hadoop            0.23.x           dfs.support.append    true\n"
-            + "Apache Hadoop             1.0.x           dfs.support.append    false\n"
-            + "Apache Hadoop             1.1.x           dfs.durable.sync      true\n"
-            + "Apache Hadoop          2.0.0-2.0.2        dfs.support.append    true\n"
-            + "Cloudera CDH             3u0-3u3             ????               true\n"
-            + "Cloudera CDH               3u4            dfs.support.append    true\n"
-            + "Hortonworks HDP           `1.0            dfs.support.append    false\n"
-            + "Hortonworks HDP           `1.1            dfs.support.append    false";
-        log.fatal(msg);
-        System.exit(-1);
-      }
-      try {
-        // if this class exists
-        Class.forName("org.apache.hadoop.fs.CreateFlag");
-        // we're running hadoop 2.0, 1.1
-        if (!fs.getConf().getBoolean("dfs.datanode.synconclose", false)) {
-          log.warn("dfs.datanode.synconclose set to false: data loss is possible on system reset or power loss");
-        }
-      } catch (ClassNotFoundException ex) {
-        // hadoop 1.0
-      }
-    }
-    
-  }
-  
-  /**
-   * Copy local walogs into HDFS on an upgrade
-   * 
-   */
-  public static void recoverLocalWriteAheadLogs(FileSystem fs, ServerConfiguration serverConf) throws IOException {
-    FileSystem localfs = FileSystem.getLocal(fs.getConf()).getRawFileSystem();
-    AccumuloConfiguration conf = serverConf.getConfiguration();
-    String localWalDirectories = conf.get(Property.LOGGER_DIR);
-    for (String localWalDirectory : localWalDirectories.split(",")) {
-      if (!localWalDirectory.startsWith("/")) {
-        localWalDirectory = System.getenv("ACCUMULO_HOME") + "/" + localWalDirectory;
-      }
-      
-      FileStatus status = null;
-      try {
-        status = localfs.getFileStatus(new Path(localWalDirectory));
-      } catch (FileNotFoundException fne) {}
-      
-      if (status == null || !status.isDir()) {
-        log.debug("Local walog dir " + localWalDirectory + " not found ");
-        continue;
-      }
-      
-      for (FileStatus file : localfs.listStatus(new Path(localWalDirectory))) {
-        String name = file.getPath().getName();
-        try {
-          UUID.fromString(name);
-        } catch (IllegalArgumentException ex) {
-          log.info("Ignoring non-log file " + name + " in " + localWalDirectory);
-          continue;
-        }
-        LogFileKey key = new LogFileKey();
-        LogFileValue value = new LogFileValue();
-        log.info("Openning local log " + file.getPath());
-        Reader reader = new SequenceFile.Reader(localfs, file.getPath(), localfs.getConf());
-        Path tmp = new Path(Constants.getWalDirectory(conf) + "/" + name + ".copy");
-        FSDataOutputStream writer = fs.create(tmp);
-        while (reader.next(key, value)) {
-          try {
-            key.write(writer);
-            value.write(writer);
-          } catch (EOFException ex) {
-            break;
-          }
-        }
-        writer.close();
-        reader.close();
-        fs.rename(tmp, new Path(tmp.getParent(), name));
-        log.info("Copied local log " + name);
-        localfs.delete(new Path(localWalDirectory, name), true);
-      }
-    }
-  }
-  
   public void minorCompactionFinished(CommitSession tablet, String newDatafile, int walogSeq) throws IOException {
     totalMinorCompactions++;
     logger.minorCompactionFinished(tablet, newDatafile, walogSeq);
@@ -3331,8 +3245,9 @@ public class TabletServer extends Abstra
     logger.minorCompactionStarted(tablet, lastUpdateSequence, newMapfileLocation);
   }
   
-  public void recover(Tablet tablet, List<LogEntry> logEntries, Set<String> tabletFiles, MutationReceiver mutationReceiver) throws IOException {
-    List<String> recoveryLogs = new ArrayList<String>();
+  public void recover(VolumeManager fs, Tablet tablet, List<LogEntry> logEntries, Set<String> tabletFiles, MutationReceiver mutationReceiver)
+      throws IOException {
+    List<Path> recoveryLogs = new ArrayList<Path>();
     List<LogEntry> sorted = new ArrayList<LogEntry>(logEntries);
     Collections.sort(sorted, new Comparator<LogEntry>() {
       @Override
@@ -3341,14 +3256,13 @@ public class TabletServer extends Abstra
       }
     });
     for (LogEntry entry : sorted) {
-      String recovery = null;
+      Path recovery = null;
       for (String log : entry.logSet) {
         String[] parts = log.split("/"); // "host:port/filename"
-        log = ServerConstants.getRecoveryDir() + "/" + parts[1];
-        Path finished = new Path(log + "/finished");
+        Path finished = new Path(fs.getFullPath(ServerConstants.getRecoveryDirs(), parts[parts.length - 1]), "finished");
         TabletServer.log.info("Looking for " + finished);
         if (fs.exists(finished)) {
-          recovery = log;
+          recovery = finished.getParent();
           break;
         }
       }
@@ -3356,7 +3270,7 @@ public class TabletServer extends Abstra
         throw new IOException("Unable to find recovery files for extent " + tablet.getExtent() + " logEntry: " + entry);
       recoveryLogs.add(recovery);
     }
-    logger.recover(tablet, recoveryLogs, tabletFiles, mutationReceiver);
+    logger.recover(fs, tablet, recoveryLogs, tabletFiles, mutationReceiver);
   }
   
   private final AtomicInteger logIdGenerator = new AtomicInteger();
@@ -3547,7 +3461,7 @@ public class TabletServer extends Abstra
     return new DfsLogger.ServerResources() {
       
       @Override
-      public FileSystem getFileSystem() {
+      public VolumeManager getFileSystem() {
         return fs;
       }
       
@@ -3563,4 +3477,8 @@ public class TabletServer extends Abstra
     };
   }
   
+  public VolumeManager getFileSystem() {
+    return fs;
+  }
+  
 }

Modified: accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java?rev=1496226&r1=1496225&r2=1496226&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java (original)
+++ accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java Mon Jun 24 21:34:20 2013
@@ -36,7 +36,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.accumulo.trace.instrument.TraceExecutorService;
-import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
@@ -46,14 +45,16 @@ import org.apache.accumulo.core.util.Dae
 import org.apache.accumulo.core.util.LoggingRunnable;
 import org.apache.accumulo.core.util.MetadataTable.DataFileValue;
 import org.apache.accumulo.core.util.NamingThreadFactory;
+import org.apache.accumulo.core.util.RootTable;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.FileRef;
+import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.tabletserver.FileManager.ScanFileManager;
 import org.apache.accumulo.server.tabletserver.Tablet.MajorCompactionReason;
 import org.apache.accumulo.server.tabletserver.Tablet.MinorCompactionReason;
 import org.apache.accumulo.server.util.time.SimpleTimer;
 import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.log4j.Logger;
 
 /**
@@ -140,7 +141,7 @@ public class TabletServerResourceManager
     return addEs(name, new ThreadPoolExecutor(min, max, timeout, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new NamingThreadFactory(name)));
   }
   
-  public TabletServerResourceManager(Instance instance, FileSystem fs) {
+  public TabletServerResourceManager(Instance instance, VolumeManager fs) {
     this.conf = new ServerConfiguration(instance);
     final AccumuloConfiguration acuConf = conf.getConfiguration();
     
@@ -455,10 +456,10 @@ public class TabletServerResourceManager
   }
   
   private class MapFileInfo {
-    private final String path;
+    private final FileRef path;
     private final long size;
     
-    MapFileInfo(String path, long size) {
+    MapFileInfo(FileRef path, long size) {
       this.path = path;
       this.size = size;
     }
@@ -544,10 +545,10 @@ public class TabletServerResourceManager
     // BEGIN methods that Tablets call to make decisions about major compaction
     // when too many files are open, we may want tablets to compact down
     // to one map file
-    Map<String,Long> findMapFilesToCompact(SortedMap<String,DataFileValue> tabletFiles, MajorCompactionReason reason) {
+    Map<FileRef,Long> findMapFilesToCompact(SortedMap<FileRef,DataFileValue> tabletFiles, MajorCompactionReason reason) {
       if (reason == MajorCompactionReason.USER) {
-        Map<String,Long> files = new HashMap<String,Long>();
-        for (Entry<String,DataFileValue> entry : tabletFiles.entrySet()) {
+        Map<FileRef,Long> files = new HashMap<FileRef,Long>();
+        for (Entry<FileRef,DataFileValue> entry : tabletFiles.entrySet()) {
           files.put(entry.getKey(), entry.getValue().getSize());
         }
         return files;
@@ -572,7 +573,7 @@ public class TabletServerResourceManager
       int maxFilesToCompact = tableConf.getCount(Property.TSERV_MAJC_THREAD_MAXOPEN);
       int maxFilesPerTablet = tableConf.getMaxFilesPerTablet();
       
-      for (Entry<String,DataFileValue> entry : tabletFiles.entrySet()) {
+      for (Entry<FileRef,DataFileValue> entry : tabletFiles.entrySet()) {
         candidateFiles.add(new MapFileInfo(entry.getKey(), entry.getValue().getSize()));
       }
       
@@ -581,7 +582,7 @@ public class TabletServerResourceManager
         totalSize += mfi.size;
       }
       
-      Map<String,Long> files = new HashMap<String,Long>();
+      Map<FileRef,Long> files = new HashMap<FileRef,Long>();
       
       while (candidateFiles.size() > 1) {
         MapFileInfo max = candidateFiles.last();
@@ -607,12 +608,12 @@ public class TabletServerResourceManager
       
       if (files.size() < totalFilesToCompact) {
         
-        TreeMap<String,DataFileValue> tfc = new TreeMap<String,DataFileValue>(tabletFiles);
+        TreeMap<FileRef,DataFileValue> tfc = new TreeMap<FileRef,DataFileValue>(tabletFiles);
         tfc.keySet().removeAll(files.keySet());
         
         // put data in candidateFiles to sort it
         candidateFiles.clear();
-        for (Entry<String,DataFileValue> entry : tfc.entrySet())
+        for (Entry<FileRef,DataFileValue> entry : tfc.entrySet())
           candidateFiles.add(new MapFileInfo(entry.getKey(), entry.getValue().getSize()));
         
         for (MapFileInfo mfi : candidateFiles) {
@@ -628,7 +629,7 @@ public class TabletServerResourceManager
       return files;
     }
     
-    boolean needsMajorCompaction(SortedMap<String,DataFileValue> tabletFiles, MajorCompactionReason reason) {
+    boolean needsMajorCompaction(SortedMap<FileRef,DataFileValue> tabletFiles, MajorCompactionReason reason) {
       if (closed)
         return false;// throw new IOException("closed");
         
@@ -709,7 +710,7 @@ public class TabletServerResourceManager
   }
   
   public void executeMajorCompaction(KeyExtent tablet, Runnable compactionTask) {
-    if (tablet.equals(Constants.ROOT_TABLET_EXTENT)) {
+    if (tablet.equals(RootTable.EXTENT)) {
       rootMajorCompactionThreadPool.execute(compactionTask);
     } else if (tablet.isMeta()) {
       defaultMajorCompactionThreadPool.execute(compactionTask);

Modified: accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java?rev=1496226&r1=1496225&r2=1496226&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java (original)
+++ accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java Mon Jun 24 21:34:20 2013
@@ -29,7 +29,6 @@ import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -37,26 +36,21 @@ import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
 
-import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.util.Daemon;
 import org.apache.accumulo.core.util.StringUtil;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.logger.LogFileKey;
 import org.apache.accumulo.server.logger.LogFileValue;
 import org.apache.accumulo.server.master.state.TServerInstance;
 import org.apache.accumulo.server.tabletserver.TabletMutations;
-import org.apache.accumulo.server.trace.TraceFileSystem;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-//import org.apache.hadoop.fs.CreateFlag;
-//import org.apache.hadoop.fs.Syncable;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.util.Progressable;
 import org.apache.log4j.Logger;
 
 /**
@@ -80,7 +74,7 @@ public class DfsLogger {
   public interface ServerResources {
     AccumuloConfiguration getConfiguration();
     
-    FileSystem getFileSystem();
+    VolumeManager getFileSystem();
     
     Set<TServerInstance> getCurrentTServers();
   }
@@ -180,11 +174,6 @@ public class DfsLogger {
     }
   }
   
-  /*
-   * (non-Javadoc)
-   * 
-   * @see org.apache.accumulo.server.tabletserver.log.IRemoteLogger#equals(java.lang.Object)
-   */
   @Override
   public boolean equals(Object obj) {
     // filename is unique
@@ -195,11 +184,6 @@ public class DfsLogger {
     return false;
   }
   
-  /*
-   * (non-Javadoc)
-   * 
-   * @see org.apache.accumulo.server.tabletserver.log.IRemoteLogger#hashCode()
-   */
   @Override
   public int hashCode() {
     // filename is unique
@@ -217,13 +201,13 @@ public class DfsLogger {
     this.conf = conf;
   }
   
-  public DfsLogger(ServerResources conf, String logger, String filename) throws IOException {
+  public DfsLogger(ServerResources conf, String logger, Path filename) throws IOException {
     this.conf = conf;
     this.logger = logger;
-    this.logPath = new Path(Constants.getWalDirectory(conf.getConfiguration()), filename);
+    this.logPath = filename;
   }
   
-  public static FSDataInputStream readHeader(FileSystem fs, Path path, Map<String,String> opts) throws IOException {
+  public static FSDataInputStream readHeader(VolumeManager fs, Path path, Map<String,String> opts) throws IOException {
     FSDataInputStream file = fs.open(path);
     try {
       byte[] magic = LOG_FILE_HEADER_V2.getBytes();
@@ -252,23 +236,20 @@ public class DfsLogger {
     logger = StringUtil.join(Arrays.asList(address.split(":")), "+");
     
     log.debug("DfsLogger.open() begin");
+    VolumeManager fs = conf.getFileSystem();
     
-    logPath = new Path(Constants.getWalDirectory(conf.getConfiguration()) + "/" + logger + "/" + filename);
+    logPath = new Path(fs.choose(ServerConstants.getWalDirs()) + "/" + logger + "/" + filename);
     try {
-      FileSystem fs = conf.getFileSystem();
       short replication = (short) conf.getConfiguration().getCount(Property.TSERV_WAL_REPLICATION);
       if (replication == 0)
-        replication = fs.getDefaultReplication();
+        replication = fs.getDefaultReplication(logPath);
       long blockSize = conf.getConfiguration().getMemoryInBytes(Property.TSERV_WAL_BLOCKSIZE);
       if (blockSize == 0)
         blockSize = (long) (conf.getConfiguration().getMemoryInBytes(Property.TSERV_WALOG_MAX_SIZE) * 1.1);
-      int checkSum = fs.getConf().getInt("io.bytes.per.checksum", 512);
-      blockSize -= blockSize % checkSum;
-      blockSize = Math.max(blockSize, checkSum);
       if (conf.getConfiguration().getBoolean(Property.TSERV_WAL_SYNC))
-        logFile = create(fs, logPath, true, fs.getConf().getInt("io.file.buffer.size", 4096), replication, blockSize);
+        logFile = fs.createSyncable(logPath, 0, replication, blockSize);
       else
-        logFile = fs.create(logPath, true, fs.getConf().getInt("io.file.buffer.size", 4096), replication, blockSize);
+        logFile = fs.create(logPath, true, 0, replication, blockSize);
       
       try {
         NoSuchMethodException e = null;
@@ -282,15 +263,13 @@ public class DfsLogger {
           // hsync: send data to datanodes and sync the data to disk
           sync = logFile.getClass().getMethod("hsync");
           e = null;
-        } catch (NoSuchMethodException ex) {
-        }
+        } catch (NoSuchMethodException ex) {}
         if (e != null)
           throw new RuntimeException(e);
       } catch (Exception e) {
         throw new RuntimeException(e);
       }
       
-      
       // Initialize the crypto operations.
       @SuppressWarnings("deprecation")
       org.apache.accumulo.core.security.crypto.CryptoModule cryptoModule = org.apache.accumulo.core.security.crypto.CryptoModuleFactory.getCryptoModule(conf
@@ -336,48 +315,6 @@ public class DfsLogger {
     t.start();
   }
   
-  private FSDataOutputStream create(FileSystem fs, Path logPath, boolean b, int buffersize, short replication, long blockSize) throws IOException {
-    try {
-      // This... 
-      //    EnumSet<CreateFlag> set = EnumSet.of(CreateFlag.SYNC_BLOCK, CreateFlag.CREATE);
-      //    return fs.create(logPath, FsPermission.getDefault(), set, buffersize, replication, blockSize, null);
-      // Becomes this:
-      Class<?> createFlags = Class.forName("org.apache.hadoop.fs.CreateFlag");
-      List<Enum<?>> flags = new ArrayList<Enum<?>>();
-      if (createFlags.isEnum()) {
-        for (Object constant : createFlags.getEnumConstants()) {
-          if (constant.toString().equals("SYNC_BLOCK")) {
-            flags.add((Enum<?>)constant);
-            log.debug("Found synch enum " + constant);
-          }
-          if (constant.toString().equals("CREATE")) {
-            flags.add((Enum<?>)constant);
-            log.debug("Found CREATE enum " + constant);
-          }
-        }
-      }
-      Object set = EnumSet.class.getMethod("of", java.lang.Enum.class, java.lang.Enum.class).invoke(null, flags.get(0), flags.get(1));
-      log.debug("CreateFlag set: " + set);
-      if (fs instanceof TraceFileSystem) {
-        fs = ((TraceFileSystem)fs).getImplementation();
-      }
-      Method create = fs.getClass().getMethod("create", Path.class, FsPermission.class, EnumSet.class, Integer.TYPE, Short.TYPE, Long.TYPE, Progressable.class);
-      log.debug("creating " + logPath + " with SYNCH_BLOCK flag");
-      return (FSDataOutputStream)create.invoke(fs, logPath, FsPermission.getDefault(), set, buffersize, replication, blockSize, null);
-    } catch (ClassNotFoundException ex) {
-      // Expected in hadoop 1.0
-      return fs.create(logPath, b, buffersize, replication, blockSize);
-    } catch (Exception ex) {
-      log.debug(ex, ex);
-      return fs.create(logPath, b, buffersize, replication, blockSize);
-    }
-  }
-
-  /*
-   * (non-Javadoc)
-   * 
-   * @see org.apache.accumulo.server.tabletserver.log.IRemoteLogger#toString()
-   */
   @Override
   public String toString() {
     return getLogger() + "/" + getFileName();
@@ -388,7 +325,7 @@ public class DfsLogger {
   }
   
   public String getFileName() {
-    return logPath.getName();
+    return logPath.toString();
   }
   
   public void close() throws IOException {

Modified: accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java?rev=1496226&r1=1496225&r2=1496226&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java (original)
+++ accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java Mon Jun 24 21:34:20 2013
@@ -37,6 +37,7 @@ import org.apache.accumulo.core.master.t
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.SimpleThreadPool;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.logger.LogFileKey;
 import org.apache.accumulo.server.logger.LogFileValue;
 import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
@@ -54,7 +55,7 @@ import org.apache.zookeeper.KeeperExcept
 public class LogSorter {
   
   private static final Logger log = Logger.getLogger(LogSorter.class);
-  FileSystem fs;
+  VolumeManager fs;
   AccumuloConfiguration conf;
   
   private final Map<String,LogProcessor> currentWork = Collections.synchronizedMap(new HashMap<String,LogProcessor>());
@@ -74,21 +75,24 @@ public class LogSorter {
     
     @Override
     public void process(String child, byte[] data) {
-      String dest = Constants.getRecoveryDir(conf) + "/" + child;
-      String src = new String(data);
-      String name = new Path(src).getName();
+      String work = new String(data);
+      String[] parts = work.split("\\|");
+      String src = parts[0];
+      String dest = parts[1];
+      String sortId = new Path(src).getName();
+      log.debug("Sorting " + src + " to " + dest + " using sortId " + sortId);
       
       synchronized (currentWork) {
-        if (currentWork.containsKey(name))
+        if (currentWork.containsKey(sortId))
           return;
-        currentWork.put(name, this);
+        currentWork.put(sortId, this);
       }
       
       try {
         log.info("Copying " + src + " to " + dest);
-        sort(name, new Path(src), dest);
+        sort(sortId, new Path(src), dest);
       } finally {
-        currentWork.remove(name);
+        currentWork.remove(sortId);
       }
       
     }
@@ -104,7 +108,7 @@ public class LogSorter {
       try {
         
         // the following call does not throw an exception if the file/dir does not exist
-        fs.delete(new Path(destPath), true);
+        fs.deleteRecursively(new Path(destPath));
         
         FSDataInputStream tmpInput = fs.open(srcPath);
         DataInputStream tmpDecryptingInput = tmpInput;
@@ -192,8 +196,9 @@ public class LogSorter {
     }
     
     private void writeBuffer(String destPath, ArrayList<Pair<LogFileKey,LogFileValue>> buffer, int part) throws IOException {
-      String path = destPath + String.format("/part-r-%05d", part++);
-      MapFile.Writer output = new MapFile.Writer(fs.getConf(), fs, path, LogFileKey.class, LogFileValue.class);
+      Path path = new Path(destPath, String.format("part-r-%05d", part++));
+      FileSystem ns = fs.getFileSystemByPath(path);
+      MapFile.Writer output = new MapFile.Writer(ns.getConf(), ns, path.toString(), LogFileKey.class, LogFileValue.class);
       try {
         Collections.sort(buffer, new Comparator<Pair<LogFileKey,LogFileValue>>() {
           @Override
@@ -233,7 +238,7 @@ public class LogSorter {
   ThreadPoolExecutor threadPool;
   private final Instance instance;
   
-  public LogSorter(Instance instance, FileSystem fs, AccumuloConfiguration conf) {
+  public LogSorter(Instance instance, VolumeManager fs, AccumuloConfiguration conf) {
     this.instance = instance;
     this.fs = fs;
     this.conf = conf;

Modified: accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/tabletserver/log/MultiReader.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/tabletserver/log/MultiReader.java?rev=1496226&r1=1496225&r2=1496226&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/tabletserver/log/MultiReader.java (original)
+++ accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/tabletserver/log/MultiReader.java Mon Jun 24 21:34:20 2013
@@ -19,8 +19,8 @@ package org.apache.accumulo.server.table
 import java.io.EOFException;
 import java.io.IOException;
 
+import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.commons.collections.buffer.PriorityBuffer;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -87,16 +87,17 @@ public class MultiReader {
   
   private PriorityBuffer heap = new PriorityBuffer();
   
-  public MultiReader(FileSystem fs, Configuration conf, String directory) throws IOException {
+  public MultiReader(VolumeManager fs, Path directory) throws IOException {
     boolean foundFinish = false;
-    for (FileStatus child : fs.listStatus(new Path(directory))) {
+    for (FileStatus child : fs.listStatus(directory)) {
       if (child.getPath().getName().startsWith("_"))
         continue;
       if (child.getPath().getName().equals("finished")) {
         foundFinish = true;
         continue;
       }
-      heap.add(new Index(new Reader(fs, child.getPath().toString(), conf)));
+      FileSystem ns = fs.getFileSystemByPath(child.getPath());
+      heap.add(new Index(new Reader(ns, child.getPath().toString(), ns.getConf())));
     }
     if (!foundFinish)
       throw new IOException("Sort \"finished\" flag not found in " + directory);

Modified: accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/tabletserver/log/SortedLogRecovery.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/tabletserver/log/SortedLogRecovery.java?rev=1496226&r1=1496225&r2=1496226&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/tabletserver/log/SortedLogRecovery.java (original)
+++ accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/tabletserver/log/SortedLogRecovery.java Mon Jun 24 21:34:20 2013
@@ -29,14 +29,10 @@ import java.util.Set;
 
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.file.FileUtil;
-import org.apache.accumulo.core.util.CachedConfiguration;
-import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.logger.LogFileKey;
 import org.apache.accumulo.server.logger.LogFileValue;
-import org.apache.accumulo.server.trace.TraceFileSystem;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.log4j.Logger;
 
 /**
@@ -58,7 +54,11 @@ public class SortedLogRecovery {
     public UnusedException() { super(); }
   }
 
-  public SortedLogRecovery() {}
+  private VolumeManager fs;
+
+  public SortedLogRecovery(VolumeManager fs) {
+    this.fs = fs;
+  }
   
   private enum Status {
     INITIAL, LOOKING_FOR_FINISH, COMPLETE
@@ -89,15 +89,13 @@ public class SortedLogRecovery {
     }
   }
   
-  public void recover(KeyExtent extent, List<String> recoveryLogs, Set<String> tabletFiles, MutationReceiver mr) throws IOException {
-    Configuration conf = CachedConfiguration.getInstance();
-    FileSystem fs = TraceFileSystem.wrap(FileUtil.getFileSystem(conf, ServerConfiguration.getSiteConfiguration()));
+  public void recover(KeyExtent extent, List<Path> recoveryLogs, Set<String> tabletFiles, MutationReceiver mr) throws IOException {
     int[] tids = new int[recoveryLogs.size()];
     LastStartToFinish lastStartToFinish = new LastStartToFinish();
     for (int i = 0; i < recoveryLogs.size(); i++) {
-      String logfile = recoveryLogs.get(i);
+      Path logfile = recoveryLogs.get(i);
       log.info("Looking at mutations from " + logfile + " for " + extent);
-      MultiReader reader = new MultiReader(fs, conf, logfile);
+      MultiReader reader = new MultiReader(fs, logfile);
       try {
         try {
           tids[i] = findLastStartToFinish(reader, i, extent, tabletFiles, lastStartToFinish);
@@ -122,8 +120,8 @@ public class SortedLogRecovery {
       throw new RuntimeException("COMPACTION_FINISH (without preceding COMPACTION_START) not followed by successful minor compaction");
     
     for (int i = 0; i < recoveryLogs.size(); i++) {
-      String logfile = recoveryLogs.get(i);
-      MultiReader reader = new MultiReader(fs, conf, logfile);
+      Path logfile = recoveryLogs.get(i);
+      MultiReader reader = new MultiReader(fs, logfile);
       try {
         playbackMutations(reader, tids[i], lastStartToFinish, mr);
       } finally {
@@ -191,6 +189,7 @@ public class SortedLogRecovery {
         lastStartToFinish.update(fileno, key.seq);
         
         // Tablet server finished the minor compaction, but didn't remove the entry from the METADATA table.
+        log.error("filename in compaction start " + key.filename);
         if (tabletFiles.contains(key.filename))
           lastStartToFinish.update(-1);
       } else if (key.event == COMPACTION_FINISH) {

Modified: accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java?rev=1496226&r1=1496225&r2=1496226&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java (original)
+++ accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java Mon Jun 24 21:34:20 2013
@@ -34,11 +34,13 @@ import org.apache.accumulo.core.conf.Pro
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.tabletserver.Tablet;
 import org.apache.accumulo.server.tabletserver.Tablet.CommitSession;
 import org.apache.accumulo.server.tabletserver.TabletMutations;
 import org.apache.accumulo.server.tabletserver.TabletServer;
 import org.apache.accumulo.server.tabletserver.log.DfsLogger.LoggerOperation;
+import org.apache.hadoop.fs.Path;
 import org.apache.log4j.Logger;
 
 /**
@@ -413,11 +415,11 @@ public class TabletServerLogger {
     return seq;
   }
   
-  public void recover(Tablet tablet, List<String> logs, Set<String> tabletFiles, MutationReceiver mr) throws IOException {
+  public void recover(VolumeManager fs, Tablet tablet, List<Path> logs, Set<String> tabletFiles, MutationReceiver mr) throws IOException {
     if (!enabled(tablet))
       return;
     try {
-      SortedLogRecovery recovery = new SortedLogRecovery();
+      SortedLogRecovery recovery = new SortedLogRecovery(fs);
       KeyExtent extent = tablet.getExtent();
       recovery.recover(extent, logs, tabletFiles, mr);
     } catch (Exception e) {

Modified: accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java?rev=1496226&r1=1496225&r2=1496226&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java (original)
+++ accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java Mon Jun 24 21:34:20 2013
@@ -28,6 +28,7 @@ import org.apache.accumulo.core.client.B
 import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.MutationsRejectedException;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken.Properties;
@@ -36,17 +37,18 @@ import org.apache.accumulo.core.conf.Acc
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.file.FileUtil;
+import org.apache.accumulo.core.iterators.user.AgeOffFilter;
 import org.apache.accumulo.core.security.SecurityUtil;
 import org.apache.accumulo.core.trace.TraceFormatter;
 import org.apache.accumulo.core.util.AddressUtil;
-import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
 import org.apache.accumulo.server.Accumulo;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
 import org.apache.accumulo.server.util.time.SimpleTimer;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.start.classloader.AccumuloClassLoader;
@@ -54,7 +56,6 @@ import org.apache.accumulo.trace.instrum
 import org.apache.accumulo.trace.thrift.RemoteSpan;
 import org.apache.accumulo.trace.thrift.SpanReceiver.Iface;
 import org.apache.accumulo.trace.thrift.SpanReceiver.Processor;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.Text;
 import org.apache.log4j.Logger;
 import org.apache.thrift.TByteArrayOutputStream;
@@ -185,6 +186,9 @@ public class TraceServer implements Watc
         connector = serverConfiguration.getInstance().getConnector(principal, at);
         if (!connector.tableOperations().exists(table)) {
           connector.tableOperations().create(table);
+          IteratorSetting setting = new IteratorSetting(10, "ageoff", AgeOffFilter.class.getName());
+          AgeOffFilter.setTTL(setting, 7 * 24 * 60 * 60 * 1000l);
+          connector.tableOperations().attachIterator(table, setting);
         }
         connector.tableOperations().setProperty(table, Property.TABLE_FORMATTER_CLASS.getKey(), TraceFormatter.class.getName());
         break;
@@ -254,7 +258,7 @@ public class TraceServer implements Watc
     SecurityUtil.serverLogin();
     Instance instance = HdfsZooInstance.getInstance();
     ServerConfiguration conf = new ServerConfiguration(instance);
-    FileSystem fs = FileUtil.getFileSystem(CachedConfiguration.getInstance(), conf.getConfiguration());
+    VolumeManager fs = VolumeManagerImpl.get();
     Accumulo.init(fs, conf, "tracer");
     String hostname = Accumulo.getLocalAddress(args);
     TraceServer server = new TraceServer(conf, hostname);

Modified: accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/util/AddFilesWithMissingEntries.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/util/AddFilesWithMissingEntries.java?rev=1496226&r1=1496225&r2=1496226&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/util/AddFilesWithMissingEntries.java (original)
+++ accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/util/AddFilesWithMissingEntries.java Mon Jun 24 21:34:20 2013
@@ -20,7 +20,6 @@ import java.util.HashSet;
 import java.util.Map.Entry;
 import java.util.Set;
 
-import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.cli.BatchWriterOpts;
 import org.apache.accumulo.core.client.MultiTableBatchWriter;
 import org.apache.accumulo.core.client.Scanner;
@@ -30,6 +29,9 @@ import org.apache.accumulo.core.data.Mut
 import org.apache.accumulo.core.data.PartialKey;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.MetadataTable;
+import org.apache.accumulo.core.util.RootTable;
 import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.cli.ClientOpts;
 import org.apache.hadoop.conf.Configuration;
@@ -46,12 +48,12 @@ public class AddFilesWithMissingEntries 
   static final Logger log = Logger.getLogger(AddFilesWithMissingEntries.class);
   
   public static class Opts extends ClientOpts {
-    @Parameter(names = "-update", description = "Make changes to the " + Constants.METADATA_TABLE_NAME + " table to include missing files")
+    @Parameter(names = "-update", description = "Make changes to the " + MetadataTable.NAME + " table to include missing files")
     boolean update = false;
   }
   
   /**
-   * A utility to add files to the {@value Constants#METADATA_TABLE_NAME} table that are not listed in the root tablet. This is a recovery tool for someone who
+   * A utility to add files to the {@value MetadataTable#NAME} table that are not listed in the root tablet. This is a recovery tool for someone who
    * knows what they are doing. It might be better to save off files, and recover your instance by re-initializing and importing the existing files.
    */
   public static void main(String[] args) throws Exception {
@@ -59,9 +61,9 @@ public class AddFilesWithMissingEntries 
     BatchWriterOpts bwOpts = new BatchWriterOpts();
     opts.parseArgs(AddFilesWithMissingEntries.class.getName(), args, bwOpts);
     
-    final Key rootTableEnd = new Key(Constants.ROOT_TABLET_EXTENT.getEndRow());
-    final Range range = new Range(rootTableEnd.followingKey(PartialKey.ROW), true, Constants.METADATA_RESERVED_KEYSPACE_START_KEY, false);
-    final Scanner scanner = opts.getConnector().createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS);
+    final Key rootTableEnd = new Key(RootTable.EXTENT.getEndRow());
+    final Range range = new Range(rootTableEnd.followingKey(PartialKey.ROW), true, MetadataTable.RESERVED_RANGE_START_KEY, false);
+    final Scanner scanner = opts.getConnector().createScanner(MetadataTable.NAME, Authorizations.EMPTY);
     scanner.setRange(range);
     final Configuration conf = new Configuration();
     final FileSystem fs = FileSystem.get(conf);
@@ -87,10 +89,10 @@ public class AddFilesWithMissingEntries 
         knownFiles.clear();
         last = ke;
       }
-      if (Constants.METADATA_DIRECTORY_COLUMN.hasColumns(key)) {
+      if (MetadataTable.DIRECTORY_COLUMN.hasColumns(key)) {
         directory = entry.getValue().toString();
         log.debug("Found directory " + directory + " for row " + key.getRow().toString());
-      } else if (key.compareColumnFamily(Constants.METADATA_DATAFILE_COLUMN_FAMILY) == 0) {
+      } else if (key.compareColumnFamily(MetadataTable.DATAFILE_COLUMN_FAMILY) == 0) {
         String filename = key.getColumnQualifier().toString();
         knownFiles.add(filename);
         log.debug("METADATA file found: " + filename);
@@ -110,20 +112,22 @@ public class AddFilesWithMissingEntries 
     final String tableId = ke.getTableId().toString();
     final Text row = ke.getMetadataEntry();
     log.info(row.toString());
-    final Path path = new Path(ServerConstants.getTablesDir() + "/" + tableId + directory);
-    for (FileStatus file : fs.listStatus(path)) {
-      if (file.getPath().getName().endsWith("_tmp") || file.getPath().getName().endsWith("_tmp.rf"))
-        continue;
-      final String filename = directory + "/" + file.getPath().getName();
-      if (!knownFiles.contains(filename)) {
-        count++;
-        final Mutation m = new Mutation(row);
-        String size = Long.toString(file.getLen());
-        String entries = "1"; // lie
-        String value = size + "," + entries;
-        m.put(Constants.METADATA_DATAFILE_COLUMN_FAMILY, new Text(filename), new Value(value.getBytes()));
-        if (update) {
-          writer.getBatchWriter(Constants.METADATA_TABLE_NAME).addMutation(m);
+    for (String dir : ServerConstants.getTablesDirs()) {
+      final Path path = new Path(dir + "/" + tableId + directory);
+      for (FileStatus file : fs.listStatus(path)) {
+        if (file.getPath().getName().endsWith("_tmp") || file.getPath().getName().endsWith("_tmp.rf"))
+          continue;
+        final String filename = directory + "/" + file.getPath().getName();
+        if (!knownFiles.contains(filename)) {
+          count++;
+          final Mutation m = new Mutation(row);
+          String size = Long.toString(file.getLen());
+          String entries = "1"; // lie
+          String value = size + "," + entries;
+          m.put(MetadataTable.DATAFILE_COLUMN_FAMILY, new Text(filename), new Value(value.getBytes()));
+          if (update) {
+            writer.getBatchWriter(MetadataTable.NAME).addMutation(m);
+          }
         }
       }
     }

Modified: accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/util/Admin.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/util/Admin.java?rev=1496226&r1=1496225&r2=1496226&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/util/Admin.java (original)
+++ accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/util/Admin.java Mon Jun 24 21:34:20 2013
@@ -22,7 +22,6 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.Connector;
@@ -34,6 +33,7 @@ import org.apache.accumulo.core.client.s
 import org.apache.accumulo.core.master.thrift.MasterClientService;
 import org.apache.accumulo.core.security.CredentialHelper;
 import org.apache.accumulo.core.security.thrift.TCredentials;
+import org.apache.accumulo.core.util.MetadataTable;
 import org.apache.accumulo.server.cli.ClientOpts;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.security.SecurityConstants;
@@ -132,7 +132,7 @@ public class Admin {
           Connector conn = instance.getConnector(principal, token);
           Set<String> tables = conn.tableOperations().tableIdMap().keySet();
           for (String table : tables) {
-            if (table.equals(Constants.METADATA_TABLE_NAME))
+            if (table.equals(MetadataTable.NAME))
               continue;
             try {
               conn.tableOperations().flush(table, null, null, false);

Modified: accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/util/CheckForMetadataProblems.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/util/CheckForMetadataProblems.java?rev=1496226&r1=1496225&r2=1496226&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/util/CheckForMetadataProblems.java (original)
+++ accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/util/CheckForMetadataProblems.java Mon Jun 24 21:34:20 2013
@@ -23,8 +23,6 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeSet;
 
-import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.server.cli.ClientOpts;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.impl.Writer;
@@ -32,11 +30,13 @@ import org.apache.accumulo.core.data.Key
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.CredentialHelper;
 import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException;
-import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.server.cli.ClientOpts;
 import org.apache.accumulo.server.conf.ServerConfiguration;
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
 import org.apache.hadoop.io.Text;
 
 import com.beust.jcommander.Parameter;
@@ -97,7 +97,7 @@ public class CheckForMetadataProblems {
       sawProblems = true;
   }
   
-  public static void checkMetadataTableEntries(Opts opts, FileSystem fs) throws Exception {
+  public static void checkMetadataTableEntries(Opts opts, VolumeManager fs) throws Exception {
     Map<String,TreeSet<KeyExtent>> tables = new HashMap<String,TreeSet<KeyExtent>>();
     
     Scanner scanner;
@@ -105,12 +105,12 @@ public class CheckForMetadataProblems {
     if (opts.offline) {
       scanner = new OfflineMetadataScanner(ServerConfiguration.getSystemConfiguration(opts.getInstance()), fs);
     } else {
-      scanner =  opts.getConnector().createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS);
+      scanner = opts.getConnector().createScanner(MetadataTable.NAME, Authorizations.EMPTY);
     }
     
-    scanner.setRange(Constants.METADATA_KEYSPACE);
-    Constants.METADATA_PREV_ROW_COLUMN.fetch(scanner);
-    scanner.fetchColumnFamily(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY);
+    scanner.setRange(MetadataTable.KEYSPACE);
+    MetadataTable.PREV_ROW_COLUMN.fetch(scanner);
+    scanner.fetchColumnFamily(MetadataTable.CURRENT_LOCATION_COLUMN_FAMILY);
     
     Text colf = new Text();
     Text colq = new Text();
@@ -140,11 +140,11 @@ public class CheckForMetadataProblems {
         tables.put(tableName, tablets);
       }
       
-      if (Constants.METADATA_PREV_ROW_COLUMN.equals(colf, colq)) {
+      if (MetadataTable.PREV_ROW_COLUMN.equals(colf, colq)) {
         KeyExtent tabletKe = new KeyExtent(entry.getKey().getRow(), entry.getValue());
         tablets.add(tabletKe);
         justLoc = false;
-      } else if (colf.equals(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY)) {
+      } else if (colf.equals(MetadataTable.CURRENT_LOCATION_COLUMN_FAMILY)) {
         if (justLoc) {
           System.out.println("Problem at key " + entry.getKey());
           sawProblems = true;
@@ -166,7 +166,7 @@ public class CheckForMetadataProblems {
     }
     
     if (count == 0) {
-      System.err.println("ERROR : " + Constants.METADATA_TABLE_NAME + " table is empty");
+      System.err.println("ERROR : " + MetadataTable.NAME + " table is empty");
       sawProblems = true;
     }
     
@@ -180,19 +180,18 @@ public class CheckForMetadataProblems {
   }
   
   static class Opts extends ClientOpts {
-    @Parameter(names="--fix", description="best-effort attempt to fix problems found")
+    @Parameter(names = "--fix", description = "best-effort attempt to fix problems found")
     boolean fix = false;
     
-    @Parameter(names="--offline", description="perform the check on the files directly")
+    @Parameter(names = "--offline", description = "perform the check on the files directly")
     boolean offline = false;
   }
   
-  
   public static void main(String[] args) throws Exception {
     Opts opts = new Opts();
     opts.parseArgs(CheckForMetadataProblems.class.getName(), args);
     
-    FileSystem fs = FileSystem.get(CachedConfiguration.getInstance());
+    VolumeManager fs = VolumeManagerImpl.get();
     checkMetadataTableEntries(opts, fs);
     opts.stopTracing();
     if (sawProblems)

Modified: accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/util/FindOfflineTablets.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/util/FindOfflineTablets.java?rev=1496226&r1=1496225&r2=1496226&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/util/FindOfflineTablets.java (original)
+++ accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/util/FindOfflineTablets.java Mon Jun 24 21:34:20 2013
@@ -20,10 +20,11 @@ import java.util.Iterator;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.conf.DefaultConfiguration;
 import org.apache.accumulo.core.master.state.tables.TableState;
+import org.apache.accumulo.core.util.MetadataTable;
+import org.apache.accumulo.core.util.RootTable;
 import org.apache.accumulo.server.cli.ClientOpts;
 import org.apache.accumulo.server.master.LiveTServerSet;
 import org.apache.accumulo.server.master.LiveTServerSet.Listener;
@@ -47,8 +48,8 @@ public class FindOfflineTablets {
     opts.parseArgs(FindOfflineTablets.class.getName(), args);
     final AtomicBoolean scanning = new AtomicBoolean(false);
     Instance instance = opts.getInstance();
-    MetaDataTableScanner rootScanner = new MetaDataTableScanner(instance, SecurityConstants.getSystemCredentials(), Constants.METADATA_ROOT_TABLET_KEYSPACE);
-    MetaDataTableScanner metaScanner = new MetaDataTableScanner(instance, SecurityConstants.getSystemCredentials(), Constants.NON_ROOT_METADATA_KEYSPACE);
+    MetaDataTableScanner rootScanner = new MetaDataTableScanner(instance, SecurityConstants.getSystemCredentials(), RootTable.METADATA_TABLETS_RANGE);
+    MetaDataTableScanner metaScanner = new MetaDataTableScanner(instance, SecurityConstants.getSystemCredentials(), MetadataTable.NON_ROOT_KEYSPACE);
     @SuppressWarnings("unchecked")
     Iterator<TabletLocationState> scanner = (Iterator<TabletLocationState>)new IteratorChain(rootScanner, metaScanner);
     LiveTServerSet tservers = new LiveTServerSet(instance, DefaultConfiguration.getDefaultConfiguration(), new Listener() {
@@ -66,7 +67,7 @@ public class FindOfflineTablets {
       TabletLocationState locationState = scanner.next();
       TabletState state = locationState.getState(tservers.getCurrentServers());
       if (state != null && state != TabletState.HOSTED && TableManager.getInstance().getTableState(locationState.extent.getTableId().toString()) != TableState.OFFLINE)
-        if (!locationState.extent.equals(Constants.ROOT_TABLET_EXTENT))
+        if (!locationState.extent.equals(RootTable.EXTENT))
           System.out.println(locationState + " is " + state + "  #walogs:" + locationState.walogs.size());
     }
   }



Mime
View raw message