accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject svn commit: r1494759 [4/5] - in /accumulo/trunk: core/src/main/java/org/apache/accumulo/core/client/admin/ core/src/main/java/org/apache/accumulo/core/client/impl/ core/src/main/java/org/apache/accumulo/core/client/mock/ core/src/main/java/org/apache/a...
Date Wed, 19 Jun 2013 20:18:32 GMT
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java Wed Jun 19 20:18:30 2013
@@ -29,7 +29,6 @@ import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -44,20 +43,15 @@ import org.apache.accumulo.core.data.Mut
 import org.apache.accumulo.core.util.Daemon;
 import org.apache.accumulo.core.util.StringUtil;
 import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.logger.LogFileKey;
 import org.apache.accumulo.server.logger.LogFileValue;
 import org.apache.accumulo.server.master.state.TServerInstance;
 import org.apache.accumulo.server.tabletserver.TabletMutations;
-import org.apache.accumulo.server.trace.TraceFileSystem;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.util.Progressable;
 import org.apache.log4j.Logger;
-//import org.apache.hadoop.fs.CreateFlag;
-//import org.apache.hadoop.fs.Syncable;
 
 /**
  * Wrap a connection to a logger.
@@ -80,7 +74,7 @@ public class DfsLogger {
   public interface ServerResources {
     AccumuloConfiguration getConfiguration();
     
-    FileSystem getFileSystem();
+    VolumeManager getFileSystem();
     
     Set<TServerInstance> getCurrentTServers();
   }
@@ -207,13 +201,13 @@ public class DfsLogger {
     this.conf = conf;
   }
   
-  public DfsLogger(ServerResources conf, String logger, String filename) throws IOException {
+  public DfsLogger(ServerResources conf, String logger, Path filename) throws IOException {
     this.conf = conf;
     this.logger = logger;
-    this.logPath = new Path(ServerConstants.getWalDirectory(), filename);
+    this.logPath = filename;
   }
   
-  public static FSDataInputStream readHeader(FileSystem fs, Path path, Map<String,String> opts) throws IOException {
+  public static FSDataInputStream readHeader(VolumeManager fs, Path path, Map<String,String> opts) throws IOException {
     FSDataInputStream file = fs.open(path);
     try {
       byte[] magic = LOG_FILE_HEADER_V2.getBytes();
@@ -242,23 +236,20 @@ public class DfsLogger {
     logger = StringUtil.join(Arrays.asList(address.split(":")), "+");
     
     log.debug("DfsLogger.open() begin");
+    VolumeManager fs = conf.getFileSystem();
     
-    logPath = new Path(ServerConstants.getWalDirectory() + "/" + logger + "/" + filename);
+    logPath = new Path(fs.choose(ServerConstants.getWalDirs()) + "/" + logger + "/" + filename);
     try {
-      FileSystem fs = conf.getFileSystem();
       short replication = (short) conf.getConfiguration().getCount(Property.TSERV_WAL_REPLICATION);
       if (replication == 0)
-        replication = fs.getDefaultReplication();
+        replication = fs.getDefaultReplication(logPath);
       long blockSize = conf.getConfiguration().getMemoryInBytes(Property.TSERV_WAL_BLOCKSIZE);
       if (blockSize == 0)
         blockSize = (long) (conf.getConfiguration().getMemoryInBytes(Property.TSERV_WALOG_MAX_SIZE) * 1.1);
-      int checkSum = fs.getConf().getInt("io.bytes.per.checksum", 512);
-      blockSize -= blockSize % checkSum;
-      blockSize = Math.max(blockSize, checkSum);
       if (conf.getConfiguration().getBoolean(Property.TSERV_WAL_SYNC))
-        logFile = create(fs, logPath, true, fs.getConf().getInt("io.file.buffer.size", 4096), replication, blockSize);
+        logFile = fs.createSyncable(logPath, 0, replication, blockSize);
       else
-        logFile = fs.create(logPath, true, fs.getConf().getInt("io.file.buffer.size", 4096), replication, blockSize);
+        logFile = fs.create(logPath, true, 0, replication, blockSize);
       
       try {
         NoSuchMethodException e = null;
@@ -324,43 +315,6 @@ public class DfsLogger {
     t.start();
   }
   
-  private FSDataOutputStream create(FileSystem fs, Path logPath, boolean b, int buffersize, short replication, long blockSize) throws IOException {
-    try {
-      // This...
-      // EnumSet<CreateFlag> set = EnumSet.of(CreateFlag.SYNC_BLOCK, CreateFlag.CREATE);
-      // return fs.create(logPath, FsPermission.getDefault(), set, buffersize, replication, blockSize, null);
-      // Becomes this:
-      Class<?> createFlags = Class.forName("org.apache.hadoop.fs.CreateFlag");
-      List<Enum<?>> flags = new ArrayList<Enum<?>>();
-      if (createFlags.isEnum()) {
-        for (Object constant : createFlags.getEnumConstants()) {
-          if (constant.toString().equals("SYNC_BLOCK")) {
-            flags.add((Enum<?>) constant);
-            log.debug("Found synch enum " + constant);
-          }
-          if (constant.toString().equals("CREATE")) {
-            flags.add((Enum<?>) constant);
-            log.debug("Found CREATE enum " + constant);
-          }
-        }
-      }
-      Object set = EnumSet.class.getMethod("of", java.lang.Enum.class, java.lang.Enum.class).invoke(null, flags.get(0), flags.get(1));
-      log.debug("CreateFlag set: " + set);
-      if (fs instanceof TraceFileSystem) {
-        fs = ((TraceFileSystem) fs).getImplementation();
-      }
-      Method create = fs.getClass().getMethod("create", Path.class, FsPermission.class, EnumSet.class, Integer.TYPE, Short.TYPE, Long.TYPE, Progressable.class);
-      log.debug("creating " + logPath + " with SYNCH_BLOCK flag");
-      return (FSDataOutputStream) create.invoke(fs, logPath, FsPermission.getDefault(), set, buffersize, replication, blockSize, null);
-    } catch (ClassNotFoundException ex) {
-      // Expected in hadoop 1.0
-      return fs.create(logPath, b, buffersize, replication, blockSize);
-    } catch (Exception ex) {
-      log.debug(ex, ex);
-      return fs.create(logPath, b, buffersize, replication, blockSize);
-    }
-  }
-  
   @Override
   public String toString() {
     return getLogger() + "/" + getFileName();
@@ -371,7 +325,7 @@ public class DfsLogger {
   }
   
   public String getFileName() {
-    return logPath.getName();
+    return logPath.toString();
   }
   
   public void close() throws IOException {

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java Wed Jun 19 20:18:30 2013
@@ -37,7 +37,7 @@ import org.apache.accumulo.core.master.t
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.SimpleThreadPool;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
-import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.logger.LogFileKey;
 import org.apache.accumulo.server.logger.LogFileValue;
 import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
@@ -55,7 +55,7 @@ import org.apache.zookeeper.KeeperExcept
 public class LogSorter {
   
   private static final Logger log = Logger.getLogger(LogSorter.class);
-  FileSystem fs;
+  VolumeManager fs;
   AccumuloConfiguration conf;
   
   private final Map<String,LogProcessor> currentWork = Collections.synchronizedMap(new HashMap<String,LogProcessor>());
@@ -75,21 +75,24 @@ public class LogSorter {
     
     @Override
     public void process(String child, byte[] data) {
-      String dest = ServerConstants.getRecoveryDir() + "/" + child;
-      String src = new String(data);
-      String name = new Path(src).getName();
+      String work = new String(data);
+      String[] parts = work.split("\\|");
+      String src = parts[0];
+      String dest = parts[1];
+      String sortId = new Path(src).getName();
+      log.debug("Sorting " + src + " to " + dest + " using sortId " + sortId);
       
       synchronized (currentWork) {
-        if (currentWork.containsKey(name))
+        if (currentWork.containsKey(sortId))
           return;
-        currentWork.put(name, this);
+        currentWork.put(sortId, this);
       }
       
       try {
         log.info("Copying " + src + " to " + dest);
-        sort(name, new Path(src), dest);
+        sort(sortId, new Path(src), dest);
       } finally {
-        currentWork.remove(name);
+        currentWork.remove(sortId);
       }
       
     }
@@ -105,7 +108,7 @@ public class LogSorter {
       try {
         
         // the following call does not throw an exception if the file/dir does not exist
-        fs.delete(new Path(destPath), true);
+        fs.deleteRecursively(new Path(destPath));
         
         FSDataInputStream tmpInput = fs.open(srcPath);
         DataInputStream tmpDecryptingInput = tmpInput;
@@ -193,8 +196,9 @@ public class LogSorter {
     }
     
     private void writeBuffer(String destPath, ArrayList<Pair<LogFileKey,LogFileValue>> buffer, int part) throws IOException {
-      String path = destPath + String.format("/part-r-%05d", part++);
-      MapFile.Writer output = new MapFile.Writer(fs.getConf(), fs, path, LogFileKey.class, LogFileValue.class);
+      Path path = new Path(destPath, String.format("part-r-%05d", part++));
+      FileSystem ns = fs.getFileSystemByPath(path);
+      MapFile.Writer output = new MapFile.Writer(ns.getConf(), ns, path.toString(), LogFileKey.class, LogFileValue.class);
       try {
         Collections.sort(buffer, new Comparator<Pair<LogFileKey,LogFileValue>>() {
           @Override
@@ -234,7 +238,7 @@ public class LogSorter {
   ThreadPoolExecutor threadPool;
   private final Instance instance;
   
-  public LogSorter(Instance instance, FileSystem fs, AccumuloConfiguration conf) {
+  public LogSorter(Instance instance, VolumeManager fs, AccumuloConfiguration conf) {
     this.instance = instance;
     this.fs = fs;
     this.conf = conf;

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/MultiReader.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/MultiReader.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/MultiReader.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/MultiReader.java Wed Jun 19 20:18:30 2013
@@ -19,8 +19,8 @@ package org.apache.accumulo.server.table
 import java.io.EOFException;
 import java.io.IOException;
 
+import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.commons.collections.buffer.PriorityBuffer;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -87,16 +87,17 @@ public class MultiReader {
   
   private PriorityBuffer heap = new PriorityBuffer();
   
-  public MultiReader(FileSystem fs, Configuration conf, String directory) throws IOException {
+  public MultiReader(VolumeManager fs, Path directory) throws IOException {
     boolean foundFinish = false;
-    for (FileStatus child : fs.listStatus(new Path(directory))) {
+    for (FileStatus child : fs.listStatus(directory)) {
       if (child.getPath().getName().startsWith("_"))
         continue;
       if (child.getPath().getName().equals("finished")) {
         foundFinish = true;
         continue;
       }
-      heap.add(new Index(new Reader(fs, child.getPath().toString(), conf)));
+      FileSystem ns = fs.getFileSystemByPath(child.getPath());
+      heap.add(new Index(new Reader(ns, child.getPath().toString(), ns.getConf())));
     }
     if (!foundFinish)
       throw new IOException("Sort \"finished\" flag not found in " + directory);

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/SortedLogRecovery.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/SortedLogRecovery.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/SortedLogRecovery.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/SortedLogRecovery.java Wed Jun 19 20:18:30 2013
@@ -29,14 +29,10 @@ import java.util.Set;
 
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.file.FileUtil;
-import org.apache.accumulo.core.util.CachedConfiguration;
-import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.logger.LogFileKey;
 import org.apache.accumulo.server.logger.LogFileValue;
-import org.apache.accumulo.server.trace.TraceFileSystem;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.log4j.Logger;
 
 /**
@@ -58,7 +54,11 @@ public class SortedLogRecovery {
     public UnusedException() { super(); }
   }
 
-  public SortedLogRecovery() {}
+  private VolumeManager fs;
+
+  public SortedLogRecovery(VolumeManager fs) {
+    this.fs = fs;
+  }
   
   private enum Status {
     INITIAL, LOOKING_FOR_FINISH, COMPLETE
@@ -89,15 +89,13 @@ public class SortedLogRecovery {
     }
   }
   
-  public void recover(KeyExtent extent, List<String> recoveryLogs, Set<String> tabletFiles, MutationReceiver mr) throws IOException {
-    Configuration conf = CachedConfiguration.getInstance();
-    FileSystem fs = TraceFileSystem.wrap(FileUtil.getFileSystem(conf, ServerConfiguration.getSiteConfiguration()));
+  public void recover(KeyExtent extent, List<Path> recoveryLogs, Set<String> tabletFiles, MutationReceiver mr) throws IOException {
     int[] tids = new int[recoveryLogs.size()];
     LastStartToFinish lastStartToFinish = new LastStartToFinish();
     for (int i = 0; i < recoveryLogs.size(); i++) {
-      String logfile = recoveryLogs.get(i);
+      Path logfile = recoveryLogs.get(i);
       log.info("Looking at mutations from " + logfile + " for " + extent);
-      MultiReader reader = new MultiReader(fs, conf, logfile);
+      MultiReader reader = new MultiReader(fs, logfile);
       try {
         try {
           tids[i] = findLastStartToFinish(reader, i, extent, tabletFiles, lastStartToFinish);
@@ -122,8 +120,8 @@ public class SortedLogRecovery {
       throw new RuntimeException("COMPACTION_FINISH (without preceding COMPACTION_START) not followed by successful minor compaction");
     
     for (int i = 0; i < recoveryLogs.size(); i++) {
-      String logfile = recoveryLogs.get(i);
-      MultiReader reader = new MultiReader(fs, conf, logfile);
+      Path logfile = recoveryLogs.get(i);
+      MultiReader reader = new MultiReader(fs, logfile);
       try {
         playbackMutations(reader, tids[i], lastStartToFinish, mr);
       } finally {
@@ -191,6 +189,7 @@ public class SortedLogRecovery {
         lastStartToFinish.update(fileno, key.seq);
         
         // Tablet server finished the minor compaction, but didn't remove the entry from the METADATA table.
+        log.error("filename in compaction start " + key.filename);
         if (tabletFiles.contains(key.filename))
           lastStartToFinish.update(-1);
       } else if (key.event == COMPACTION_FINISH) {

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java Wed Jun 19 20:18:30 2013
@@ -34,11 +34,13 @@ import org.apache.accumulo.core.conf.Pro
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.tabletserver.Tablet;
 import org.apache.accumulo.server.tabletserver.Tablet.CommitSession;
 import org.apache.accumulo.server.tabletserver.TabletMutations;
 import org.apache.accumulo.server.tabletserver.TabletServer;
 import org.apache.accumulo.server.tabletserver.log.DfsLogger.LoggerOperation;
+import org.apache.hadoop.fs.Path;
 import org.apache.log4j.Logger;
 
 /**
@@ -413,11 +415,11 @@ public class TabletServerLogger {
     return seq;
   }
   
-  public void recover(Tablet tablet, List<String> logs, Set<String> tabletFiles, MutationReceiver mr) throws IOException {
+  public void recover(VolumeManager fs, Tablet tablet, List<Path> logs, Set<String> tabletFiles, MutationReceiver mr) throws IOException {
     if (!enabled(tablet))
       return;
     try {
-      SortedLogRecovery recovery = new SortedLogRecovery();
+      SortedLogRecovery recovery = new SortedLogRecovery(fs);
       KeyExtent extent = tablet.getExtent();
       recovery.recover(extent, logs, tabletFiles, mr);
     } catch (Exception e) {

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java Wed Jun 19 20:18:30 2013
@@ -37,18 +37,18 @@ import org.apache.accumulo.core.conf.Acc
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.file.FileUtil;
 import org.apache.accumulo.core.iterators.user.AgeOffFilter;
 import org.apache.accumulo.core.security.SecurityUtil;
 import org.apache.accumulo.core.trace.TraceFormatter;
 import org.apache.accumulo.core.util.AddressUtil;
-import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
 import org.apache.accumulo.server.Accumulo;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
 import org.apache.accumulo.server.util.time.SimpleTimer;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.start.classloader.AccumuloClassLoader;
@@ -56,7 +56,6 @@ import org.apache.accumulo.trace.instrum
 import org.apache.accumulo.trace.thrift.RemoteSpan;
 import org.apache.accumulo.trace.thrift.SpanReceiver.Iface;
 import org.apache.accumulo.trace.thrift.SpanReceiver.Processor;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.Text;
 import org.apache.log4j.Logger;
 import org.apache.thrift.TByteArrayOutputStream;
@@ -259,7 +258,7 @@ public class TraceServer implements Watc
     SecurityUtil.serverLogin();
     Instance instance = HdfsZooInstance.getInstance();
     ServerConfiguration conf = new ServerConfiguration(instance);
-    FileSystem fs = FileUtil.getFileSystem(CachedConfiguration.getInstance(), conf.getConfiguration());
+    VolumeManager fs = VolumeManagerImpl.get();
     Accumulo.init(fs, conf, "tracer");
     String hostname = Accumulo.getLocalAddress(args);
     TraceServer server = new TraceServer(conf, hostname);

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/AddFilesWithMissingEntries.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/AddFilesWithMissingEntries.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/AddFilesWithMissingEntries.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/AddFilesWithMissingEntries.java Wed Jun 19 20:18:30 2013
@@ -112,20 +112,22 @@ public class AddFilesWithMissingEntries 
     final String tableId = ke.getTableId().toString();
     final Text row = ke.getMetadataEntry();
     log.info(row.toString());
-    final Path path = new Path(ServerConstants.getTablesDir() + "/" + tableId + directory);
-    for (FileStatus file : fs.listStatus(path)) {
-      if (file.getPath().getName().endsWith("_tmp") || file.getPath().getName().endsWith("_tmp.rf"))
-        continue;
-      final String filename = directory + "/" + file.getPath().getName();
-      if (!knownFiles.contains(filename)) {
-        count++;
-        final Mutation m = new Mutation(row);
-        String size = Long.toString(file.getLen());
-        String entries = "1"; // lie
-        String value = size + "," + entries;
-        m.put(MetadataTable.DATAFILE_COLUMN_FAMILY, new Text(filename), new Value(value.getBytes()));
-        if (update) {
-          writer.getBatchWriter(MetadataTable.NAME).addMutation(m);
+    for (String dir : ServerConstants.getTablesDirs()) {
+      final Path path = new Path(dir + "/" + tableId + directory);
+      for (FileStatus file : fs.listStatus(path)) {
+        if (file.getPath().getName().endsWith("_tmp") || file.getPath().getName().endsWith("_tmp.rf"))
+          continue;
+        final String filename = directory + "/" + file.getPath().getName();
+        if (!knownFiles.contains(filename)) {
+          count++;
+          final Mutation m = new Mutation(row);
+          String size = Long.toString(file.getLen());
+          String entries = "1"; // lie
+          String value = size + "," + entries;
+          m.put(MetadataTable.DATAFILE_COLUMN_FAMILY, new Text(filename), new Value(value.getBytes()));
+          if (update) {
+            writer.getBatchWriter(MetadataTable.NAME).addMutation(m);
+          }
         }
       }
     }

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/CheckForMetadataProblems.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/CheckForMetadataProblems.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/CheckForMetadataProblems.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/CheckForMetadataProblems.java Wed Jun 19 20:18:30 2013
@@ -33,10 +33,10 @@ import org.apache.accumulo.core.data.Val
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.CredentialHelper;
 import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException;
-import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.accumulo.server.cli.ClientOpts;
 import org.apache.accumulo.server.conf.ServerConfiguration;
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
 import org.apache.hadoop.io.Text;
 
 import com.beust.jcommander.Parameter;
@@ -97,7 +97,7 @@ public class CheckForMetadataProblems {
       sawProblems = true;
   }
   
-  public static void checkMetadataTableEntries(Opts opts, FileSystem fs) throws Exception {
+  public static void checkMetadataTableEntries(Opts opts, VolumeManager fs) throws Exception {
     Map<String,TreeSet<KeyExtent>> tables = new HashMap<String,TreeSet<KeyExtent>>();
     
     Scanner scanner;
@@ -191,7 +191,7 @@ public class CheckForMetadataProblems {
     Opts opts = new Opts();
     opts.parseArgs(CheckForMetadataProblems.class.getName(), args);
     
-    FileSystem fs = FileSystem.get(CachedConfiguration.getInstance());
+    VolumeManager fs = VolumeManagerImpl.get();
     checkMetadataTableEntries(opts, fs);
     opts.stopTracing();
     if (sawProblems)

Added: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/FileUtil.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/FileUtil.java?rev=1494759&view=auto
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/FileUtil.java (added)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/FileUtil.java Wed Jun 19 20:18:30 2013
@@ -0,0 +1,551 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.file.FileSKVWriter;
+import org.apache.accumulo.core.file.rfile.RFile;
+import org.apache.accumulo.core.file.rfile.RFileOperations;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.system.MultiIterator;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.core.util.LocalityGroupUtil;
+import org.apache.accumulo.server.fs.FileRef;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
+public class FileUtil {
+  
+  public static class FileInfo {
+    Key firstKey = new Key();
+    Key lastKey = new Key();
+    
+    public FileInfo(Key firstKey, Key lastKey) {
+      this.firstKey = firstKey;
+      this.lastKey = lastKey;
+    }
+    
+    public Text getFirstRow() {
+      return firstKey.getRow();
+    }
+    
+    public Text getLastRow() {
+      return lastKey.getRow();
+    }
+  }
+  
+  private static final Logger log = Logger.getLogger(FileUtil.class);
+  
+  private static String createTmpDir(AccumuloConfiguration acuConf, VolumeManager fs) throws IOException {
+    String accumuloDir = acuConf.get(Property.INSTANCE_DFS_DIR);
+    
+    String tmpDir = null;
+    while (tmpDir == null) {
+      tmpDir = accumuloDir + "/tmp/idxReduce_" + String.format("%09d", (int) (Math.random() * Integer.MAX_VALUE));
+      
+      try {
+        fs.getFileStatus(new Path(tmpDir));
+        tmpDir = null;
+        continue;
+      } catch (FileNotFoundException fne) {
+        // found an unused temp directory
+      }
+      
+      fs.mkdirs(new Path(tmpDir));
+      
+      // try to reserve the tmp dir
+      if (!fs.createNewFile(new Path(tmpDir + "/__reserve")))
+        tmpDir = null;
+    }
+    
+    return tmpDir;
+  }
+  
+  public static Collection<FileRef> reduceFiles(AccumuloConfiguration acuConf, Configuration conf, VolumeManager fs, Text prevEndRow, Text endRow,
+      Collection<FileRef> mapFiles, int maxFiles, String tmpDir, int pass) throws IOException {
+    ArrayList<FileRef> paths = new ArrayList<FileRef>(mapFiles);
+    
+    if (paths.size() <= maxFiles)
+      return paths;
+    
+    String newDir = String.format("%s/pass_%04d", tmpDir, pass);
+    
+    int start = 0;
+    
+    ArrayList<FileRef> outFiles = new ArrayList<FileRef>();
+    
+    int count = 0;
+    
+    while (start < paths.size()) {
+      int end = Math.min(maxFiles + start, paths.size());
+      List<FileRef> inFiles = paths.subList(start, end);
+      
+      start = end;
+      
+      FileRef newMapFile = new FileRef(String.format("%s/%04d." + RFile.EXTENSION, newDir, count++));
+      
+      outFiles.add(newMapFile);
+      FileSystem ns = fs.getFileSystemByPath(newMapFile.path());
+      FileSKVWriter writer = new RFileOperations().openWriter(newMapFile.toString(), ns, ns.getConf(), acuConf);
+      writer.startDefaultLocalityGroup();
+      List<SortedKeyValueIterator<Key,Value>> iters = new ArrayList<SortedKeyValueIterator<Key,Value>>(inFiles.size());
+      
+      FileSKVIterator reader = null;
+      try {
+        for (FileRef s : inFiles) {
+          reader = FileOperations.getInstance().openIndex(s.path().toString(), ns, ns.getConf(), acuConf);
+          iters.add(reader);
+        }
+        
+        MultiIterator mmfi = new MultiIterator(iters, true);
+        
+        while (mmfi.hasTop()) {
+          Key key = mmfi.getTopKey();
+          
+          boolean gtPrevEndRow = prevEndRow == null || key.compareRow(prevEndRow) > 0;
+          boolean lteEndRow = endRow == null || key.compareRow(endRow) <= 0;
+          
+          if (gtPrevEndRow && lteEndRow)
+            writer.append(key, new Value(new byte[0]));
+          
+          if (!lteEndRow)
+            break;
+          
+          mmfi.next();
+        }
+      } finally {
+        try {
+          if (reader != null)
+            reader.close();
+        } catch (IOException e) {
+          log.error(e, e);
+        }
+        
+        for (SortedKeyValueIterator<Key,Value> r : iters)
+          try {
+            if (r != null)
+              ((FileSKVIterator) r).close();
+          } catch (IOException e) {
+            // continue closing
+            log.error(e, e);
+          }
+        
+        try {
+          if (writer != null)
+            writer.close();
+        } catch (IOException e) {
+          log.error(e, e);
+          throw e;
+        }
+      }
+    }
+    
+    return reduceFiles(acuConf, conf, fs, prevEndRow, endRow, outFiles, maxFiles, tmpDir, pass + 1);
+  }
+
+  public static SortedMap<Double,Key> findMidPoint(VolumeManager fs, AccumuloConfiguration acuConf, Text prevEndRow, Text endRow, Collection<FileRef> mapFiles,
+      double minSplit) throws IOException {
+    return findMidPoint(fs, acuConf, prevEndRow, endRow, mapFiles, minSplit, true);
+  }
+  
+  public static double estimatePercentageLTE(VolumeManager fs, AccumuloConfiguration acuconf, Text prevEndRow, Text endRow, Collection<FileRef> mapFiles,
+      Text splitRow) throws IOException {
+    
+    Configuration conf = CachedConfiguration.getInstance();
+    
+    String tmpDir = null;
+    
+    int maxToOpen = acuconf.getCount(Property.TSERV_TABLET_SPLIT_FINDMIDPOINT_MAXOPEN);
+    ArrayList<FileSKVIterator> readers = new ArrayList<FileSKVIterator>(mapFiles.size());
+    
+    try {
+      if (mapFiles.size() > maxToOpen) {
+        tmpDir = createTmpDir(acuconf, fs);
+        
+        log.debug("Too many indexes (" + mapFiles.size() + ") to open at once for " + endRow + " " + prevEndRow + ", reducing in tmpDir = " + tmpDir);
+        
+        long t1 = System.currentTimeMillis();
+        mapFiles = reduceFiles(acuconf, conf, fs, prevEndRow, endRow, mapFiles, maxToOpen, tmpDir, 0);
+        long t2 = System.currentTimeMillis();
+        
+        log.debug("Finished reducing indexes for " + endRow + " " + prevEndRow + " in " + String.format("%6.2f secs", (t2 - t1) / 1000.0));
+      }
+      
+      if (prevEndRow == null)
+        prevEndRow = new Text();
+      
+      long numKeys = 0;
+      
+      numKeys = countIndexEntries(acuconf, prevEndRow, endRow, mapFiles, true, conf, fs, readers);
+      
+      if (numKeys == 0) {
+        // not enough info in the index to answer the question, so instead of going to
+        // the data just punt and return .5
+        return .5;
+      }
+      
+      List<SortedKeyValueIterator<Key,Value>> iters = new ArrayList<SortedKeyValueIterator<Key,Value>>(readers);
+      MultiIterator mmfi = new MultiIterator(iters, true);
+      
+      // skip the prevendrow
+      while (mmfi.hasTop() && mmfi.getTopKey().compareRow(prevEndRow) <= 0) {
+        mmfi.next();
+      }
+      
+      int numLte = 0;
+      
+      while (mmfi.hasTop() && mmfi.getTopKey().compareRow(splitRow) <= 0) {
+        numLte++;
+        mmfi.next();
+      }
+      
+      if (numLte > numKeys) {
+        // something went wrong
+        throw new RuntimeException("numLte > numKeys " + numLte + " " + numKeys + " " + prevEndRow + " " + endRow + " " + splitRow + " " + mapFiles);
+      }
+      
+      // do not want to return 0% or 100%, so add 1 and 2 below
+      return (numLte + 1) / (double) (numKeys + 2);
+      
+    } finally {
+      cleanupIndexOp(acuconf, tmpDir, fs, readers);
+    }
+  }
+  
+  /**
+   * 
+   * @param mapFiles
+   *          - list MapFiles to find the mid point key
+   * 
+   *          ISSUES : This method used the index files to find the mid point. If the map files have different index intervals this method will not return an
+   *          accurate mid point. Also, it would be tricky to use this method in conjunction with an in memory map because the indexing interval is unknown.
+   */
+  public static SortedMap<Double,Key> findMidPoint(VolumeManager fs, AccumuloConfiguration acuConf, Text prevEndRow, Text endRow, Collection<FileRef> mapFiles,
+      double minSplit, boolean useIndex) throws IOException {
+    Configuration conf = CachedConfiguration.getInstance();
+    
+    Collection<FileRef> origMapFiles = mapFiles;
+    
+    String tmpDir = null;
+    
+    int maxToOpen = acuConf.getCount(Property.TSERV_TABLET_SPLIT_FINDMIDPOINT_MAXOPEN);
+    ArrayList<FileSKVIterator> readers = new ArrayList<FileSKVIterator>(mapFiles.size());
+    
+    try {
+      if (mapFiles.size() > maxToOpen) {
+        if (!useIndex)
+          throw new IOException("Cannot find mid point using data files, too many " + mapFiles.size());
+        tmpDir = createTmpDir(acuConf, fs);
+        
+        log.debug("Too many indexes (" + mapFiles.size() + ") to open at once for " + endRow + " " + prevEndRow + ", reducing in tmpDir = " + tmpDir);
+        
+        long t1 = System.currentTimeMillis();
+        mapFiles = reduceFiles(acuConf, conf, fs, prevEndRow, endRow, mapFiles, maxToOpen, tmpDir, 0);
+        long t2 = System.currentTimeMillis();
+        
+        log.debug("Finished reducing indexes for " + endRow + " " + prevEndRow + " in " + String.format("%6.2f secs", (t2 - t1) / 1000.0));
+      }
+      
+      if (prevEndRow == null)
+        prevEndRow = new Text();
+      
+      long t1 = System.currentTimeMillis();
+      
+      long numKeys = 0;
+      
+      numKeys = countIndexEntries(acuConf, prevEndRow, endRow, mapFiles, tmpDir == null ? useIndex : false, conf, fs, readers);
+      
+      if (numKeys == 0) {
+        if (useIndex) {
+          log.warn("Failed to find mid point using indexes, falling back to data files which is slower. No entries between " + prevEndRow + " and " + endRow
+              + " for " + mapFiles);
+          // need to pass original map files, not possibly reduced indexes
+          return findMidPoint(fs, acuConf, prevEndRow, endRow, origMapFiles, minSplit, false);
+        }
+        throw new IOException("Failed to find mid point, no entries between " + prevEndRow + " and " + endRow + " for " + mapFiles);
+      }
+      
+      List<SortedKeyValueIterator<Key,Value>> iters = new ArrayList<SortedKeyValueIterator<Key,Value>>(readers);
+      MultiIterator mmfi = new MultiIterator(iters, true);
+      
+      // skip the prevendrow
+      while (mmfi.hasTop() && mmfi.getTopKey().compareRow(prevEndRow) <= 0)
+        mmfi.next();
+      
+      // read half of the keys in the index
+      TreeMap<Double,Key> ret = new TreeMap<Double,Key>();
+      Key lastKey = null;
+      long keysRead = 0;
+      
+      Key keyBeforeMidPoint = null;
+      long keyBeforeMidPointPosition = 0;
+      
+      while (keysRead < numKeys / 2) {
+        if (lastKey != null && !lastKey.equals(mmfi.getTopKey(), PartialKey.ROW) && (keysRead - 1) / (double) numKeys >= minSplit) {
+          keyBeforeMidPoint = new Key(lastKey);
+          keyBeforeMidPointPosition = keysRead - 1;
+        }
+        
+        if (lastKey == null)
+          lastKey = new Key();
+        
+        lastKey.set(mmfi.getTopKey());
+        
+        keysRead++;
+        
+        // consume minimum
+        mmfi.next();
+      }
+      
+      if (keyBeforeMidPoint != null)
+        ret.put(keyBeforeMidPointPosition / (double) numKeys, keyBeforeMidPoint);
+      
+      long t2 = System.currentTimeMillis();
+      
+      log.debug(String.format("Found midPoint from indexes in %6.2f secs.%n", ((t2 - t1) / 1000.0)));
+      
+      ret.put(.5, mmfi.getTopKey());
+      
+      // sanity check
+      for (Key key : ret.values()) {
+        boolean inRange = (key.compareRow(prevEndRow) > 0 && (endRow == null || key.compareRow(endRow) < 1));
+        if (!inRange) {
+          throw new IOException("Found mid point is not in range " + key + " " + prevEndRow + " " + endRow + " " + mapFiles);
+        }
+      }
+      
+      return ret;
+    } finally {
+      cleanupIndexOp(acuConf, tmpDir, fs, readers);
+    }
+  }
+  
+  private static void cleanupIndexOp(AccumuloConfiguration acuConf, String tmpDir, VolumeManager fs, ArrayList<FileSKVIterator> readers) throws IOException {
+    // close all of the index sequence files
+    for (FileSKVIterator r : readers) {
+      try {
+        if (r != null)
+          r.close();
+      } catch (IOException e) {
+        // okay, try to close the rest anyway
+        log.error(e, e);
+      }
+    }
+    
+    if (tmpDir != null) {
+      String tmpPrefix = acuConf.get(Property.INSTANCE_DFS_DIR) + "/tmp";
+      if (tmpDir.startsWith(tmpPrefix))
+        fs.deleteRecursively(new Path(tmpDir));
+      else
+        log.error("Did not delete tmp dir because it wasn't a tmp dir " + tmpDir);
+    }
+  }
+  
+  private static long countIndexEntries(AccumuloConfiguration acuConf, Text prevEndRow, Text endRow, Collection<FileRef> mapFiles, boolean useIndex,
+      Configuration conf, VolumeManager fs, ArrayList<FileSKVIterator> readers) throws IOException {
+    
+    long numKeys = 0;
+    
+    // count the total number of index entries
+    for (FileRef ref : mapFiles) {
+      FileSKVIterator reader = null;
+      Path path = ref.path();
+      FileSystem ns = fs.getFileSystemByPath(path);
+      try {
+        if (useIndex)
+          reader = FileOperations.getInstance().openIndex(path.toString(), ns, ns.getConf(), acuConf);
+        else
+          reader = FileOperations.getInstance().openReader(path.toString(), new Range(prevEndRow, false, null, true), LocalityGroupUtil.EMPTY_CF_SET, false, ns, ns.getConf(),
+              acuConf);
+        
+        while (reader.hasTop()) {
+          Key key = reader.getTopKey();
+          if (endRow != null && key.compareRow(endRow) > 0)
+            break;
+          else if (prevEndRow == null || key.compareRow(prevEndRow) > 0)
+            numKeys++;
+          
+          reader.next();
+        }
+      } finally {
+        try {
+          if (reader != null)
+            reader.close();
+        } catch (IOException e) {
+          log.error(e, e);
+        }
+      }
+      
+      if (useIndex)
+        readers.add(FileOperations.getInstance().openIndex(path.toString(), ns, ns.getConf(), acuConf));
+      else
+        readers.add(FileOperations.getInstance().openReader(path.toString(), new Range(prevEndRow, false, null, true), LocalityGroupUtil.EMPTY_CF_SET, false, ns, ns.getConf(),
+            acuConf));
+      
+    }
+    return numKeys;
+  }
+  
+  public static Map<FileRef,FileInfo> tryToGetFirstAndLastRows(VolumeManager fs, AccumuloConfiguration acuConf, Set<FileRef> mapfiles) {
+    
+    HashMap<FileRef,FileInfo> mapFilesInfo = new HashMap<FileRef,FileInfo>();
+    
+    long t1 = System.currentTimeMillis();
+    
+    for (FileRef mapfile : mapfiles) {
+      
+      FileSKVIterator reader = null;
+      FileSystem ns = fs.getFileSystemByPath(mapfile.path());
+      try {
+        reader = FileOperations.getInstance().openReader(mapfile.toString(), false, ns, ns.getConf(), acuConf);
+        
+        Key firstKey = reader.getFirstKey();
+        if (firstKey != null) {
+          mapFilesInfo.put(mapfile, new FileInfo(firstKey, reader.getLastKey()));
+        }
+        
+      } catch (IOException ioe) {
+        log.warn("Failed to read map file to determine first and last key : " + mapfile, ioe);
+      } finally {
+        if (reader != null) {
+          try {
+            reader.close();
+          } catch (IOException ioe) {
+            log.warn("failed to close " + mapfile, ioe);
+          }
+        }
+      }
+      
+    }
+    
+    long t2 = System.currentTimeMillis();
+    
+    log.debug(String.format("Found first and last keys for %d map files in %6.2f secs", mapfiles.size(), (t2 - t1) / 1000.0));
+    
+    return mapFilesInfo;
+  }
+  
+  public static WritableComparable<Key> findLastKey(VolumeManager fs, AccumuloConfiguration acuConf, Collection<FileRef> mapFiles) throws IOException {
+    Key lastKey = null;
+    
+    for (FileRef ref : mapFiles) {
+      Path path = ref.path();
+      FileSystem ns = fs.getFileSystemByPath(path);
+      FileSKVIterator reader = FileOperations.getInstance().openReader(path.toString(), true, ns, ns.getConf(), acuConf);
+      
+      try {
+        if (!reader.hasTop())
+          // file is empty, so there is no last key
+          continue;
+        
+        Key key = reader.getLastKey();
+        
+        if (lastKey == null || key.compareTo(lastKey) > 0)
+          lastKey = key;
+      } finally {
+        try {
+          if (reader != null)
+            reader.close();
+        } catch (IOException e) {
+          log.error(e, e);
+        }
+      }
+    }
+    
+    return lastKey;
+    
+  }
+  
+  private static class MLong {
+    public MLong(long i) {
+      l = i;
+    }
+    
+    long l;
+  }
+  
+  public static Map<KeyExtent,Long> estimateSizes(AccumuloConfiguration acuConf, Path mapFile, long fileSize, List<KeyExtent> extents, Configuration conf,
+      VolumeManager fs) throws IOException {
+    
+    long totalIndexEntries = 0;
+    Map<KeyExtent,MLong> counts = new TreeMap<KeyExtent,MLong>();
+    for (KeyExtent keyExtent : extents)
+      counts.put(keyExtent, new MLong(0));
+    
+    Text row = new Text();
+    FileSystem ns = fs.getFileSystemByPath(mapFile);
+    FileSKVIterator index = FileOperations.getInstance().openIndex(mapFile.toString(), ns, ns.getConf(), acuConf);
+    
+    try {
+      while (index.hasTop()) {
+        Key key = index.getTopKey();
+        totalIndexEntries++;
+        key.getRow(row);
+        
+        for (Entry<KeyExtent,MLong> entry : counts.entrySet())
+          if (entry.getKey().contains(row))
+            entry.getValue().l++;
+        
+        index.next();
+      }
+    } finally {
+      try {
+        if (index != null)
+          index.close();
+      } catch (IOException e) {
+        // continue with next file
+        log.error(e, e);
+      }
+    }
+    
+    Map<KeyExtent,Long> results = new TreeMap<KeyExtent,Long>();
+    for (KeyExtent keyExtent : extents) {
+      double numEntries = counts.get(keyExtent).l;
+      if (numEntries == 0)
+        numEntries = 1;
+      long estSize = (long) ((numEntries / totalIndexEntries) * fileSize);
+      results.put(keyExtent, estSize);
+    }
+    return results;
+  }
+  
+}

Propchange: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/FileUtil.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/Initialize.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/Initialize.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/Initialize.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/Initialize.java Wed Jun 19 20:18:30 2013
@@ -18,7 +18,10 @@ package org.apache.accumulo.server.util;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Locale;
 import java.util.Map.Entry;
 import java.util.UUID;
@@ -31,12 +34,12 @@ import org.apache.accumulo.core.client.A
 import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.conf.SiteConfiguration;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.file.FileOperations;
 import org.apache.accumulo.core.file.FileSKVWriter;
-import org.apache.accumulo.core.file.FileUtil;
 import org.apache.accumulo.core.iterators.user.VersioningIterator;
 import org.apache.accumulo.core.master.state.tables.TableState;
 import org.apache.accumulo.core.master.thrift.MasterGoalState;
@@ -52,6 +55,8 @@ import org.apache.accumulo.server.Server
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.conf.ServerConfiguration;
 import org.apache.accumulo.server.constraints.MetadataConstraints;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
 import org.apache.accumulo.server.iterators.MetadataBulkLoadFilter;
 import org.apache.accumulo.server.master.state.tables.TableManager;
 import org.apache.accumulo.server.security.AuditedSecurityOperation;
@@ -112,13 +117,13 @@ public class Initialize {
     initialMetadataConf.put(Property.TABLE_BLOCKCACHE_ENABLED.getKey(), "true");
   }
   
-  public static boolean doInit(Opts opts, Configuration conf, FileSystem fs) throws IOException {
+  public static boolean doInit(Opts opts, Configuration conf, VolumeManager fs) throws IOException {
     if (!ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_DFS_URI).equals(""))
       log.info("Hadoop Filesystem is " + ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_DFS_URI));
     else
       log.info("Hadoop Filesystem is " + FileSystem.getDefaultUri(conf));
     
-    log.info("Accumulo data dir is " + ServerConstants.getBaseDir());
+    log.info("Accumulo data dirs are " + Arrays.asList(ServerConstants.getBaseDirs()));
     log.info("Zookeeper server is " + ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_ZK_HOST));
     log.info("Checking if Zookeeper is available. If this hangs, then you need to make sure zookeeper is running");
     if (!zookeeperAvailable()) {
@@ -160,7 +165,7 @@ public class Initialize {
     return initialize(opts, instanceNamePath, fs);
   }
   
-  public static boolean initialize(Opts opts, String instanceNamePath, FileSystem fs) {
+  public static boolean initialize(Opts opts, String instanceNamePath, VolumeManager fs) {
     
     UUID uuid = UUID.randomUUID();
     try {
@@ -171,7 +176,7 @@ public class Initialize {
     }
     
     try {
-      initFileSystem(opts, fs, fs.getConf(), uuid);
+      initFileSystem(opts, fs, uuid);
     } catch (Exception e) {
       log.fatal("Failed to initialize filesystem", e);
       return false;
@@ -196,18 +201,36 @@ public class Initialize {
       return false;
     }
   }
+  private static Path[] paths(String[] paths) {
+    Path result[] = new Path[paths.length];
+    for (int i = 0; i < paths.length; i++) {
+      result[i] = new Path(paths[i]);
+    }
+    return result;
+  }
+  
+  private static <T> T[] concat(T[] a, T[] b) {
+    List<T> result = new ArrayList<T>(a.length + b.length);
+    for (int i = 0; i < a.length; i++) {
+      result.add(a[i]);
+    }
+    for (int i = 0; i < b.length; i++) {
+      result.add(b[i]);
+    }
+    return result.toArray(a);
+  }
   
-  private static void initFileSystem(Opts opts, FileSystem fs, Configuration conf, UUID uuid) throws IOException {
+  private static void initFileSystem(Opts opts, VolumeManager fs, UUID uuid) throws IOException {
     FileStatus fstat;
     
     // the actual disk location of the root tablet
     final Path rootTablet = new Path(ServerConstants.getRootTabletDir());
     
-    final Path tableMetadataTablet = new Path(ServerConstants.getMetadataTableDir() + MetadataTable.TABLE_TABLET_LOCATION);
-    final Path defaultMetadataTablet = new Path(ServerConstants.getMetadataTableDir() + Constants.DEFAULT_TABLET_LOCATION);
-    
-    final Path metadataTableDir = new Path(ServerConstants.getMetadataTableDir());
+    final Path tableMetadataTabletDirs[] = paths(ServerConstants.prefix(ServerConstants.getMetadataTableDirs(), MetadataTable.TABLE_TABLET_LOCATION));
+    final Path defaultMetadataTabletDirs[] = paths(ServerConstants.prefix(ServerConstants.getMetadataTableDirs(), Constants.DEFAULT_TABLET_LOCATION));
     
+    final Path metadataTableDirs[] = paths(ServerConstants.getMetadataTableDirs());
+
     fs.mkdirs(new Path(ServerConstants.getDataVersionLocation(), "" + ServerConstants.DATA_VERSION));
     
     // create an instance id
@@ -218,17 +241,18 @@ public class Initialize {
     initMetadataConfig();
     
     // create metadata table
-    try {
-      fstat = fs.getFileStatus(metadataTableDir);
-      if (!fstat.isDir()) {
-        log.fatal("location " + metadataTableDir.toString() + " exists but is not a directory");
-        return;
-      }
-    } catch (FileNotFoundException fnfe) {
-      // create btl dir
-      if (!fs.mkdirs(metadataTableDir)) {
-        log.fatal("unable to create directory " + metadataTableDir.toString());
-        return;
+    for (Path mtd : metadataTableDirs) {
+      try {
+        fstat = fs.getFileStatus(mtd);
+        if (!fstat.isDir()) {
+          log.fatal("location " + mtd.toString() + " exists but is not a directory");
+          return;
+        }
+      } catch (FileNotFoundException fnfe) {
+        if (!fs.mkdirs(mtd)) {
+          log.fatal("unable to create directory " + mtd.toString());
+          return;
+        }
       }
     }
     
@@ -240,91 +264,96 @@ public class Initialize {
         return;
       }
     } catch (FileNotFoundException fnfe) {
-      // create btl dir
       if (!fs.mkdirs(rootTablet)) {
         log.fatal("unable to create directory " + rootTablet.toString());
         return;
       }
-      
-      // populate the root tablet with info about the default tablet
-      // the root tablet contains the key extent and locations of all the
-      // metadata tablets
-      String initRootTabFile = ServerConstants.getRootTabletDir() + "/00000_00000."
-          + FileOperations.getNewFileExtension(AccumuloConfiguration.getDefaultConfiguration());
-      FileSKVWriter mfw = FileOperations.getInstance().openWriter(initRootTabFile, fs, conf, AccumuloConfiguration.getDefaultConfiguration());
-      mfw.startDefaultLocalityGroup();
-      
-      // -----------] root tablet info
-      Text rootExtent = RootTable.ROOT_TABLET_EXTENT.getMetadataEntry();
-      
-      // root's directory
-      Key rootDirKey = new Key(rootExtent, MetadataTable.DIRECTORY_COLUMN.getColumnFamily(), MetadataTable.DIRECTORY_COLUMN.getColumnQualifier(), 0);
-      mfw.append(rootDirKey, new Value("/root_tablet".getBytes()));
-      
-      // root's prev row
-      Key rootPrevRowKey = new Key(rootExtent, MetadataTable.PREV_ROW_COLUMN.getColumnFamily(), MetadataTable.PREV_ROW_COLUMN.getColumnQualifier(), 0);
-      mfw.append(rootPrevRowKey, new Value(new byte[] {0}));
-      
-      // ----------] table tablet info
-      Text tableExtent = new Text(KeyExtent.getMetadataEntry(new Text(MetadataTable.ID), MetadataTable.RESERVED_KEYSPACE_START_KEY.getRow()));
-      
-      // table tablet's directory
-      Key tableDirKey = new Key(tableExtent, MetadataTable.DIRECTORY_COLUMN.getColumnFamily(), MetadataTable.DIRECTORY_COLUMN.getColumnQualifier(), 0);
-      mfw.append(tableDirKey, new Value(MetadataTable.TABLE_TABLET_LOCATION.getBytes()));
-      
-      // table tablet time
-      Key tableTimeKey = new Key(tableExtent, MetadataTable.TIME_COLUMN.getColumnFamily(), MetadataTable.TIME_COLUMN.getColumnQualifier(), 0);
-      mfw.append(tableTimeKey, new Value((TabletTime.LOGICAL_TIME_ID + "0").getBytes()));
-      
-      // table tablet's prevrow
-      Key tablePrevRowKey = new Key(tableExtent, MetadataTable.PREV_ROW_COLUMN.getColumnFamily(), MetadataTable.PREV_ROW_COLUMN.getColumnQualifier(), 0);
-      mfw.append(tablePrevRowKey, KeyExtent.encodePrevEndRow(new Text(KeyExtent.getMetadataEntry(new Text(MetadataTable.ID), null))));
-      
-      // ----------] default tablet info
-      Text defaultExtent = new Text(KeyExtent.getMetadataEntry(new Text(MetadataTable.ID), null));
-      
-      // default's directory
-      Key defaultDirKey = new Key(defaultExtent, MetadataTable.DIRECTORY_COLUMN.getColumnFamily(), MetadataTable.DIRECTORY_COLUMN.getColumnQualifier(), 0);
-      mfw.append(defaultDirKey, new Value(Constants.DEFAULT_TABLET_LOCATION.getBytes()));
-      
-      // default's time
-      Key defaultTimeKey = new Key(defaultExtent, MetadataTable.TIME_COLUMN.getColumnFamily(), MetadataTable.TIME_COLUMN.getColumnQualifier(), 0);
-      mfw.append(defaultTimeKey, new Value((TabletTime.LOGICAL_TIME_ID + "0").getBytes()));
-      
-      // default's prevrow
-      Key defaultPrevRowKey = new Key(defaultExtent, MetadataTable.PREV_ROW_COLUMN.getColumnFamily(), MetadataTable.PREV_ROW_COLUMN.getColumnQualifier(), 0);
-      mfw.append(defaultPrevRowKey, KeyExtent.encodePrevEndRow(MetadataTable.RESERVED_KEYSPACE_START_KEY.getRow()));
-      
-      mfw.close();
     }
     
+    // populate the root tablet with info about the default tablet
+    // the root tablet contains the key extent and locations of all the
+    // metadata tablets
+    String initRootTabFile = rootTablet + "/00000_00000."
+        + FileOperations.getNewFileExtension(AccumuloConfiguration.getDefaultConfiguration());
+    FileSystem ns = fs.getFileSystemByPath(new Path(initRootTabFile));
+    FileSKVWriter mfw = FileOperations.getInstance().openWriter(initRootTabFile, ns, ns.getConf(), AccumuloConfiguration.getDefaultConfiguration());
+    mfw.startDefaultLocalityGroup();
+    
+    // -----------] root tablet info
+    Text rootExtent = RootTable.ROOT_TABLET_EXTENT.getMetadataEntry();
+    
+    // root's directory
+    Key rootDirKey = new Key(rootExtent, MetadataTable.DIRECTORY_COLUMN.getColumnFamily(), MetadataTable.DIRECTORY_COLUMN.getColumnQualifier(), 0);
+    mfw.append(rootDirKey, new Value("/root_tablet".getBytes()));
+    
+    // root's prev row
+    Key rootPrevRowKey = new Key(rootExtent, MetadataTable.PREV_ROW_COLUMN.getColumnFamily(), MetadataTable.PREV_ROW_COLUMN.getColumnQualifier(), 0);
+    mfw.append(rootPrevRowKey, new Value(new byte[] {0}));
+    
+    // ----------] table tablet info
+    Text tableExtent = new Text(KeyExtent.getMetadataEntry(new Text(MetadataTable.ID), MetadataTable.RESERVED_KEYSPACE_START_KEY.getRow()));
+    
+    // table tablet's directory
+    Key tableDirKey = new Key(tableExtent, MetadataTable.DIRECTORY_COLUMN.getColumnFamily(), MetadataTable.DIRECTORY_COLUMN.getColumnQualifier(), 0);
+    mfw.append(tableDirKey, new Value(MetadataTable.TABLE_TABLET_LOCATION.getBytes()));
+    
+    // table tablet time
+    Key tableTimeKey = new Key(tableExtent, MetadataTable.TIME_COLUMN.getColumnFamily(), MetadataTable.TIME_COLUMN.getColumnQualifier(), 0);
+    mfw.append(tableTimeKey, new Value((TabletTime.LOGICAL_TIME_ID + "0").getBytes()));
+    
+    // table tablet's prevrow
+    Key tablePrevRowKey = new Key(tableExtent, MetadataTable.PREV_ROW_COLUMN.getColumnFamily(), MetadataTable.PREV_ROW_COLUMN.getColumnQualifier(),
+        0);
+    mfw.append(tablePrevRowKey, KeyExtent.encodePrevEndRow(new Text(KeyExtent.getMetadataEntry(new Text(MetadataTable.ID), null))));
+    
+    // ----------] default tablet info
+    Text defaultExtent = new Text(KeyExtent.getMetadataEntry(new Text(MetadataTable.ID), null));
+    
+    // default's directory
+    Key defaultDirKey = new Key(defaultExtent, MetadataTable.DIRECTORY_COLUMN.getColumnFamily(),
+        MetadataTable.DIRECTORY_COLUMN.getColumnQualifier(), 0);
+    mfw.append(defaultDirKey, new Value(Constants.DEFAULT_TABLET_LOCATION.getBytes()));
+    
+    // default's time
+    Key defaultTimeKey = new Key(defaultExtent, MetadataTable.TIME_COLUMN.getColumnFamily(), MetadataTable.TIME_COLUMN.getColumnQualifier(), 0);
+    mfw.append(defaultTimeKey, new Value((TabletTime.LOGICAL_TIME_ID + "0").getBytes()));
+    
+    // default's prevrow
+    Key defaultPrevRowKey = new Key(defaultExtent, MetadataTable.PREV_ROW_COLUMN.getColumnFamily(),
+        MetadataTable.PREV_ROW_COLUMN.getColumnQualifier(), 0);
+    mfw.append(defaultPrevRowKey, KeyExtent.encodePrevEndRow(MetadataTable.RESERVED_KEYSPACE_START_KEY.getRow()));
+    
+    mfw.close();
+    
     // create table and default tablets directories
-    try {
-      fstat = fs.getFileStatus(defaultMetadataTablet);
-      if (!fstat.isDir()) {
-        log.fatal("location " + defaultMetadataTablet.toString() + " exists but is not a directory");
-        return;
-      }
-    } catch (FileNotFoundException fnfe) {
+    for (Path dir : concat(defaultMetadataTabletDirs, tableMetadataTabletDirs)) {
       try {
-        fstat = fs.getFileStatus(tableMetadataTablet);
+        fstat = fs.getFileStatus(dir);
         if (!fstat.isDir()) {
-          log.fatal("location " + tableMetadataTablet.toString() + " exists but is not a directory");
+          log.fatal("location " + dir.toString() + " exists but is not a directory");
           return;
         }
-      } catch (FileNotFoundException fnfe2) {
-        // create table info dir
-        if (!fs.mkdirs(tableMetadataTablet)) {
-          log.fatal("unable to create directory " + tableMetadataTablet.toString());
+      } catch (FileNotFoundException fnfe) {
+        try {
+          fstat = fs.getFileStatus(dir);
+          if (!fstat.isDir()) {
+            log.fatal("location " + dir.toString() + " exists but is not a directory");
+            return;
+          }
+        } catch (FileNotFoundException fnfe2) {
+          // create table info dir
+          if (!fs.mkdirs(dir)) {
+            log.fatal("unable to create directory " + dir.toString());
+            return;
+          }
+        }
+        
+        // create default dir
+        if (!fs.mkdirs(dir)) {
+          log.fatal("unable to create directory " + dir.toString());
           return;
         }
       }
-      
-      // create default dir
-      if (!fs.mkdirs(defaultMetadataTablet)) {
-        log.fatal("unable to create directory " + defaultMetadataTablet.toString());
-        return;
-      }
     }
   }
   
@@ -448,7 +477,7 @@ public class Initialize {
     initialMetadataConf.put(Property.TABLE_FILE_REPLICATION.getKey(), rep);
   }
   
-  public static boolean isInitialized(FileSystem fs) throws IOException {
+  public static boolean isInitialized(VolumeManager fs) throws IOException {
     return (fs.exists(ServerConstants.getInstanceIdLocation()) || fs.exists(ServerConstants.getDataVersionLocation()));
   }
   
@@ -475,7 +504,8 @@ public class Initialize {
       SecurityUtil.serverLogin();
       Configuration conf = CachedConfiguration.getInstance();
       
-      FileSystem fs = FileUtil.getFileSystem(conf, ServerConfiguration.getSiteConfiguration());
+      @SuppressWarnings("deprecation")
+      VolumeManager fs = VolumeManagerImpl.get(SiteConfiguration.getSiteConfiguration());
       
       if (opts.resetSecurity) {
         if (isInitialized(fs)) {

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/LocalityCheck.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/LocalityCheck.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/LocalityCheck.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/LocalityCheck.java Wed Jun 19 20:18:30 2013
@@ -25,13 +25,12 @@ import java.util.Map.Entry;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.accumulo.core.util.MetadataTable;
-import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.cli.ClientOpts;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -43,7 +42,7 @@ public class LocalityCheck {
     ClientOpts opts = new ClientOpts();
     opts.parseArgs(LocalityCheck.class.getName(), args);
     
-    FileSystem fs = FileSystem.get(CachedConfiguration.getInstance());
+    VolumeManager fs = VolumeManagerImpl.get();
     Connector connector = opts.getConnector();
     Scanner scanner = connector.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
     scanner.fetchColumnFamily(MetadataTable.CURRENT_LOCATION_COLUMN_FAMILY);
@@ -63,7 +62,8 @@ public class LocalityCheck {
         addBlocks(fs, host, files, totalBlocks, localBlocks);
         files.clear();
       } else if (key.compareColumnFamily(MetadataTable.DATAFILE_COLUMN_FAMILY) == 0) {
-        files.add(new String(KeyExtent.tableOfMetadataRow(key.getRow())) + slash(key.getColumnQualifier().toString()));
+        
+        files.add(fs.getFullPath(key).toString());
       }
     }
     System.out.println(" Server         %local  total blocks");
@@ -73,13 +73,7 @@ public class LocalityCheck {
     return 0;
   }
   
-  private static String slash(String path) {
-    if (path.startsWith("/"))
-      return path;
-    return "/" + path;
-  }
-  
-  private void addBlocks(FileSystem fs, String host, ArrayList<String> files, Map<String,Long> totalBlocks, Map<String,Long> localBlocks) throws Exception {
+  private void addBlocks(VolumeManager fs, String host, ArrayList<String> files, Map<String,Long> totalBlocks, Map<String,Long> localBlocks) throws Exception {
     long allBlocks = 0;
     long matchingBlocks = 0;
     if (!totalBlocks.containsKey(host)) {
@@ -87,9 +81,10 @@ public class LocalityCheck {
       localBlocks.put(host, 0L);
     }
     for (String file : files) {
-      Path filePath = new Path(ServerConstants.getTablesDir() + "/" + file);
-      FileStatus fileStatus = fs.getFileStatus(filePath);
-      BlockLocation[] fileBlockLocations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
+      Path filePath = new Path(file);
+      FileSystem ns = fs.getFileSystemByPath(filePath);
+      FileStatus fileStatus = ns.getFileStatus(filePath);
+      BlockLocation[] fileBlockLocations = ns.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
       for (BlockLocation blockLocation : fileBlockLocations) {
         allBlocks++;
         for (String location : blockLocation.getHosts()) {



Mime
View raw message