accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctubb...@apache.org
Subject [40/59] [abbrv] ACCUMULO-658 consistent package names to avoid overlapped sealed jars
Date Sat, 07 Sep 2013 03:28:43 GMT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/a10587ed/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerConstants.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerConstants.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerConstants.java
new file mode 100644
index 0000000..df416a4
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerConstants.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver;
+
+public class TabletServerConstants {
+  
+  public static final String IntermediateKeyName = "IntermediateKey";
+  public static final String ColumnSetName = "ColumnSet";
+  public static final String AuthorizationSetName = "AuthorizationSet";
+  public static final String EndKeyName = "EndKey";
+  public static final String MaxResultsName = "MaxResults";
+  public static final String PreviousQueryTypeName = "PreviousQueryType";
+  public static final String PreviousQueryStatusName = "PreviousQueryStatus";
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a10587ed/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
new file mode 100644
index 0000000..3a6f3b1
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
@@ -0,0 +1,804 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.accumulo.trace.instrument.TraceExecutorService;
+import org.apache.accumulo.tserver.FileManager.ScanFileManager;
+import org.apache.accumulo.tserver.Tablet.MajorCompactionReason;
+import org.apache.accumulo.tserver.Tablet.MinorCompactionReason;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.file.blockfile.cache.LruBlockCache;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.core.util.Daemon;
+import org.apache.accumulo.core.util.LoggingRunnable;
+import org.apache.accumulo.core.util.NamingThreadFactory;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.FileRef;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.util.time.SimpleTimer;
+import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
+import org.apache.log4j.Logger;
+
+/**
+ * ResourceManager is responsible for managing the resources of all tablets within a tablet server.
+ * 
+ * 
+ * 
+ */
+public class TabletServerResourceManager {
+  
+  private ExecutorService minorCompactionThreadPool;
+  private ExecutorService majorCompactionThreadPool;
+  private ExecutorService rootMajorCompactionThreadPool;
+  private ExecutorService defaultMajorCompactionThreadPool;
+  private ExecutorService splitThreadPool;
+  private ExecutorService defaultSplitThreadPool;
+  private ExecutorService defaultMigrationPool;
+  private ExecutorService migrationPool;
+  private ExecutorService assignmentPool;
+  private ExecutorService assignMetaDataPool;
+  private ExecutorService readAheadThreadPool;
+  private ExecutorService defaultReadAheadThreadPool;
+  private Map<String,ExecutorService> threadPools = new TreeMap<String,ExecutorService>();
+  
+  private HashSet<TabletResourceManager> tabletResources;
+  
+  private FileManager fileManager;
+  
+  private MemoryManager memoryManager;
+  
+  private MemoryManagementFramework memMgmt;
+  
+  private final LruBlockCache _dCache;
+  private final LruBlockCache _iCache;
+  private final ServerConfiguration conf;
+  
+  private static final Logger log = Logger.getLogger(TabletServerResourceManager.class);
+  
+  private ExecutorService addEs(String name, ExecutorService tp) {
+    if (threadPools.containsKey(name)) {
+      throw new IllegalArgumentException("Cannot create two executor services with same name " + name);
+    }
+    tp = new TraceExecutorService(tp);
+    threadPools.put(name, tp);
+    return tp;
+  }
+  
+  private ExecutorService addEs(final Property maxThreads, String name, final ThreadPoolExecutor tp) {
+    ExecutorService result = addEs(name, tp);
+    SimpleTimer.getInstance().schedule(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          int max = conf.getConfiguration().getCount(maxThreads);
+          if (tp.getMaximumPoolSize() != max) {
+            log.info("Changing " + maxThreads.getKey() + " to " + max);
+            tp.setCorePoolSize(max);
+            tp.setMaximumPoolSize(max);
+          }
+        } catch (Throwable t) {
+          log.error(t, t);
+        }
+      }
+      
+    }, 1000, 10 * 1000);
+    return result;
+  }
+
+  private ExecutorService createEs(int max, String name) {
+    return addEs(name, Executors.newFixedThreadPool(max, new NamingThreadFactory(name)));
+  }
+  
+  private ExecutorService createEs(Property max, String name) {
+    return createEs(max, name, new LinkedBlockingQueue<Runnable>());
+  }
+
+  private ExecutorService createEs(Property max, String name, BlockingQueue<Runnable> queue) {
+    int maxThreads = conf.getConfiguration().getCount(max);
+    ThreadPoolExecutor tp = new ThreadPoolExecutor(maxThreads, maxThreads, 0L, TimeUnit.MILLISECONDS, queue, new NamingThreadFactory(name));
+    return addEs(max, name, tp);
+  }
+
+  private ExecutorService createEs(int min, int max, int timeout, String name) {
+    return addEs(name, new ThreadPoolExecutor(min, max, timeout, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new NamingThreadFactory(name)));
+  }
+  
+  public TabletServerResourceManager(Instance instance, VolumeManager fs) {
+    this.conf = new ServerConfiguration(instance);
+    final AccumuloConfiguration acuConf = conf.getConfiguration();
+    
+    long maxMemory = acuConf.getMemoryInBytes(Property.TSERV_MAXMEM);
+    boolean usingNativeMap = acuConf.getBoolean(Property.TSERV_NATIVEMAP_ENABLED) && NativeMap.loadedNativeLibraries();
+    
+    long blockSize = acuConf.getMemoryInBytes(Property.TSERV_DEFAULT_BLOCKSIZE);
+    long dCacheSize = acuConf.getMemoryInBytes(Property.TSERV_DATACACHE_SIZE);
+    long iCacheSize = acuConf.getMemoryInBytes(Property.TSERV_INDEXCACHE_SIZE);
+    
+    _iCache = new LruBlockCache(iCacheSize, blockSize);
+    _dCache = new LruBlockCache(dCacheSize, blockSize);
+    
+    Runtime runtime = Runtime.getRuntime();
+    if (!usingNativeMap && maxMemory + dCacheSize + iCacheSize > runtime.maxMemory()) {
+      throw new IllegalArgumentException(String.format(
+          "Maximum tablet server map memory %,d and block cache sizes %,d is too large for this JVM configuration %,d", maxMemory, dCacheSize + iCacheSize,
+          runtime.maxMemory()));
+    }
+    runtime.gc();
+
+    // totalMemory - freeMemory = memory in use
+    // maxMemory - memory in use = max available memory
+    if (!usingNativeMap && maxMemory > runtime.maxMemory() - (runtime.totalMemory() - runtime.freeMemory())) {
+      log.warn("In-memory map may not fit into local memory space.");
+    }
+    
+    minorCompactionThreadPool = createEs(Property.TSERV_MINC_MAXCONCURRENT, "minor compactor");
+    
+    // make this thread pool have a priority queue... and execute tablets with the most
+    // files first!
+    majorCompactionThreadPool = createEs(Property.TSERV_MAJC_MAXCONCURRENT, "major compactor", new CompactionQueue());
+    rootMajorCompactionThreadPool = createEs(0, 1, 300, "md root major compactor");
+    defaultMajorCompactionThreadPool = createEs(0, 1, 300, "md major compactor");
+    
+    splitThreadPool = createEs(1, "splitter");
+    defaultSplitThreadPool = createEs(0, 1, 60, "md splitter");
+    
+    defaultMigrationPool = createEs(0, 1, 60, "metadata tablet migration");
+    migrationPool = createEs(Property.TSERV_MIGRATE_MAXCONCURRENT, "tablet migration");
+    
+    // not sure if concurrent assignments can run safely... even if they could there is probably no benefit at startup because
+    // individual tablet servers are already running assignments concurrently... having each individual tablet server run
+    // concurrent assignments would put more load on the metadata table at startup
+    assignmentPool = createEs(1, "tablet assignment");
+    
+    assignMetaDataPool = createEs(0, 1, 60, "metadata tablet assignment");
+    
+    readAheadThreadPool = createEs(Property.TSERV_READ_AHEAD_MAXCONCURRENT, "tablet read ahead");
+    defaultReadAheadThreadPool = createEs(Property.TSERV_METADATA_READ_AHEAD_MAXCONCURRENT, "metadata tablets read ahead");
+    
+    tabletResources = new HashSet<TabletResourceManager>();
+    
+    int maxOpenFiles = acuConf.getCount(Property.TSERV_SCAN_MAX_OPENFILES);
+    
+    fileManager = new FileManager(conf, fs, maxOpenFiles, _dCache, _iCache);
+    
+    try {
+      Class<? extends MemoryManager> clazz = AccumuloVFSClassLoader.loadClass(acuConf.get(Property.TSERV_MEM_MGMT), MemoryManager.class);
+      memoryManager = clazz.newInstance();
+      memoryManager.init(conf);
+      log.debug("Loaded memory manager : " + memoryManager.getClass().getName());
+    } catch (Exception e) {
+      log.error("Failed to find memory manger in config, using default", e);
+    }
+    
+    if (memoryManager == null) {
+      memoryManager = new LargestFirstMemoryManager();
+    }
+    
+    memMgmt = new MemoryManagementFramework();
+  }
+  
+  private static class TabletStateImpl implements TabletState, Cloneable {
+    
+    private long lct;
+    private Tablet tablet;
+    private long mts;
+    private long mcmts;
+    
+    public TabletStateImpl(Tablet t, long mts, long lct, long mcmts) {
+      this.tablet = t;
+      this.mts = mts;
+      this.lct = lct;
+      this.mcmts = mcmts;
+    }
+    
+    public KeyExtent getExtent() {
+      return tablet.getExtent();
+    }
+    
+    Tablet getTablet() {
+      return tablet;
+    }
+    
+    public long getLastCommitTime() {
+      return lct;
+    }
+    
+    public long getMemTableSize() {
+      return mts;
+    }
+    
+    public long getMinorCompactingMemTableSize() {
+      return mcmts;
+    }
+  }
+  
+  private class MemoryManagementFramework {
+    private final Map<KeyExtent,TabletStateImpl> tabletReports;
+    private LinkedBlockingQueue<TabletStateImpl> memUsageReports;
+    private long lastMemCheckTime = System.currentTimeMillis();
+    private long maxMem;
+    
+    MemoryManagementFramework() {
+      tabletReports = Collections.synchronizedMap(new HashMap<KeyExtent,TabletStateImpl>());
+      memUsageReports = new LinkedBlockingQueue<TabletStateImpl>();
+      maxMem = conf.getConfiguration().getMemoryInBytes(Property.TSERV_MAXMEM);
+      
+      Runnable r1 = new Runnable() {
+        public void run() {
+          processTabletMemStats();
+        }
+      };
+      
+      Thread t1 = new Daemon(new LoggingRunnable(log, r1));
+      t1.setPriority(Thread.NORM_PRIORITY + 1);
+      t1.setName("Accumulo Memory Guard");
+      t1.start();
+      
+      Runnable r2 = new Runnable() {
+        public void run() {
+          manageMemory();
+        }
+      };
+      
+      Thread t2 = new Daemon(new LoggingRunnable(log, r2));
+      t2.setName("Accumulo Minor Compaction Initiator");
+      t2.start();
+      
+    }
+    
+    private long lastMemTotal = 0;
+    
+    private void processTabletMemStats() {
+      while (true) {
+        try {
+          
+          TabletStateImpl report = memUsageReports.take();
+          
+          while (report != null) {
+            tabletReports.put(report.getExtent(), report);
+            report = memUsageReports.poll();
+          }
+          
+          long delta = System.currentTimeMillis() - lastMemCheckTime;
+          if (holdCommits || delta > 50 || lastMemTotal > 0.90 * maxMem) {
+            lastMemCheckTime = System.currentTimeMillis();
+            
+            long totalMemUsed = 0;
+            
+            synchronized (tabletReports) {
+              for (TabletStateImpl tsi : tabletReports.values()) {
+                totalMemUsed += tsi.getMemTableSize();
+                totalMemUsed += tsi.getMinorCompactingMemTableSize();
+              }
+            }
+            
+            if (totalMemUsed > 0.95 * maxMem) {
+              holdAllCommits(true);
+            } else {
+              holdAllCommits(false);
+            }
+            
+            lastMemTotal = totalMemUsed;
+          }
+          
+        } catch (InterruptedException e) {
+          log.warn(e, e);
+        }
+      }
+    }
+    
+    private void manageMemory() {
+      while (true) {
+        MemoryManagementActions mma = null;
+        
+        try {
+          ArrayList<TabletState> tablets;
+          synchronized (tabletReports) {
+            tablets = new ArrayList<TabletState>(tabletReports.values());
+          }
+          mma = memoryManager.getMemoryManagementActions(tablets);
+          
+        } catch (Throwable t) {
+          log.error("Memory manager failed " + t.getMessage(), t);
+        }
+        
+        try {
+          if (mma != null && mma.tabletsToMinorCompact != null && mma.tabletsToMinorCompact.size() > 0) {
+            for (KeyExtent keyExtent : mma.tabletsToMinorCompact) {
+              TabletStateImpl tabletReport = tabletReports.get(keyExtent);
+              
+              if (tabletReport == null) {
+                log.warn("Memory manager asked to compact nonexistant tablet " + keyExtent);
+                continue;
+              }
+              
+              if (!tabletReport.getTablet().initiateMinorCompaction(MinorCompactionReason.SYSTEM)) {
+                if (tabletReport.getTablet().isClosed()) {
+                  tabletReports.remove(tabletReport.getExtent());
+                  log.debug("Ignoring memory manager recommendation: not minor compacting closed tablet " + keyExtent);
+                } else {
+                  log.info("Ignoring memory manager recommendation: not minor compacting " + keyExtent);
+                }
+              }
+            }
+            
+            // log.debug("mma.tabletsToMinorCompact = "+mma.tabletsToMinorCompact);
+          }
+        } catch (Throwable t) {
+          log.error("Minor compactions for memory managment failed", t);
+        }
+        
+        UtilWaitThread.sleep(250);
+      }
+    }
+    
+    public void updateMemoryUsageStats(Tablet tablet, long size, long lastCommitTime, long mincSize) {
+      memUsageReports.add(new TabletStateImpl(tablet, size, lastCommitTime, mincSize));
+    }
+    
+    public void tabletClosed(KeyExtent extent) {
+      tabletReports.remove(extent);
+    }
+  }
+  
+  private final Object commitHold = new Object();
+  private volatile boolean holdCommits = false;
+  private long holdStartTime;
+  
+  protected void holdAllCommits(boolean holdAllCommits) {
+    synchronized (commitHold) {
+      if (holdCommits != holdAllCommits) {
+        holdCommits = holdAllCommits;
+        
+        if (holdCommits) {
+          holdStartTime = System.currentTimeMillis();
+        }
+        
+        if (!holdCommits) {
+          log.debug(String.format("Commits held for %6.2f secs", (System.currentTimeMillis() - holdStartTime) / 1000.0));
+          commitHold.notifyAll();
+        }
+      }
+    }
+    
+  }
+  
+  void waitUntilCommitsAreEnabled() {
+    if (holdCommits) {
+      long timeout = System.currentTimeMillis() + conf.getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT);
+      synchronized (commitHold) {
+        while (holdCommits) {
+          try {
+            if (System.currentTimeMillis() > timeout)
+              throw new HoldTimeoutException("Commits are held");
+            commitHold.wait(1000);
+          } catch (InterruptedException e) {}
+        }
+      }
+    }
+  }
+  
+  public long holdTime() {
+    if (!holdCommits)
+      return 0;
+    synchronized (commitHold) {
+      return System.currentTimeMillis() - holdStartTime;
+    }
+  }
+  
+  public void close() {
+    for (ExecutorService executorService : threadPools.values()) {
+      executorService.shutdown();
+    }
+    
+    for (Entry<String,ExecutorService> entry : threadPools.entrySet()) {
+      while (true) {
+        try {
+          if (entry.getValue().awaitTermination(60, TimeUnit.SECONDS))
+            break;
+          log.info("Waiting for thread pool " + entry.getKey() + " to shutdown");
+        } catch (InterruptedException e) {
+          log.warn(e);
+        }
+      }
+    }
+  }
+  
+  public synchronized TabletResourceManager createTabletResourceManager() {
+    TabletResourceManager trm = new TabletResourceManager();
+    return trm;
+  }
+  
+  synchronized private void addTabletResource(TabletResourceManager tr) {
+    tabletResources.add(tr);
+  }
+  
+  synchronized private void removeTabletResource(TabletResourceManager tr) {
+    tabletResources.remove(tr);
+  }
+  
+  private class MapFileInfo {
+    private final FileRef path;
+    private final long size;
+    
+    MapFileInfo(FileRef path, long size) {
+      this.path = path;
+      this.size = size;
+    }
+  }
+  
+  public class TabletResourceManager {
+    
+    private final long creationTime = System.currentTimeMillis();
+    
+    private volatile boolean openFilesReserved = false;
+    
+    private volatile boolean closed = false;
+    
+    private Tablet tablet;
+    
+    private AccumuloConfiguration tableConf;
+    
+    TabletResourceManager() {}
+    
+    void setTablet(Tablet tablet, AccumuloConfiguration tableConf) {
+      this.tablet = tablet;
+      this.tableConf = tableConf;
+      // TabletResourceManager is not really initialized until this
+      // function is called.... so do not make it publicly available
+      // until now
+      
+      addTabletResource(this);
+    }
+    
+    // BEGIN methods that Tablets call to manage their set of open map files
+    
+    public void importedMapFiles() {
+      lastReportedCommitTime = System.currentTimeMillis();
+    }
+    
+    synchronized ScanFileManager newScanFileManager() {
+      if (closed)
+        throw new IllegalStateException("closed");
+      return fileManager.newScanFileManager(tablet.getExtent());
+    }
+    
+    // END methods that Tablets call to manage their set of open map files
+    
+    // BEGIN methods that Tablets call to manage memory
+    
+    private AtomicLong lastReportedSize = new AtomicLong();
+    private AtomicLong lastReportedMincSize = new AtomicLong();
+    private volatile long lastReportedCommitTime = 0;
+    
+    public void updateMemoryUsageStats(long size, long mincSize) {
+      
+      // do not want to update stats for every little change,
+      // so only do it under certain circumstances... the reason
+      // for this is that reporting stats acquires a lock, do
+      // not want all tablets locking on the same lock for every
+      // commit
+      long totalSize = size + mincSize;
+      long lrs = lastReportedSize.get();
+      long delta = totalSize - lrs;
+      long lrms = lastReportedMincSize.get();
+      boolean report = false;
+      // the atomic longs are considered independently, when one is set
+      // the other is not set intentionally because this method is not
+      // synchronized... therefore there are not transactional semantics
+      // for reading and writing two variables
+      if ((lrms > 0 && mincSize == 0 || lrms == 0 && mincSize > 0) && lastReportedMincSize.compareAndSet(lrms, mincSize)) {
+        report = true;
+      }
+      
+      long currentTime = System.currentTimeMillis();
+      if ((delta > 32000 || delta < 0 || (currentTime - lastReportedCommitTime > 1000)) && lastReportedSize.compareAndSet(lrs, totalSize)) {
+        if (delta > 0)
+          lastReportedCommitTime = currentTime;
+        report = true;
+      }
+      
+      if (report)
+        memMgmt.updateMemoryUsageStats(tablet, size, lastReportedCommitTime, mincSize);
+    }
+    
+    // END methods that Tablets call to manage memory
+    
+    // BEGIN methods that Tablets call to make decisions about major compaction
+    // when too many files are open, we may want tablets to compact down
+    // to one map file
+    Map<FileRef,Long> findMapFilesToCompact(SortedMap<FileRef,DataFileValue> tabletFiles, MajorCompactionReason reason) {
+      if (reason == MajorCompactionReason.USER) {
+        Map<FileRef,Long> files = new HashMap<FileRef,Long>();
+        for (Entry<FileRef,DataFileValue> entry : tabletFiles.entrySet()) {
+          files.put(entry.getKey(), entry.getValue().getSize());
+        }
+        return files;
+      }
+      
+      if (tabletFiles.size() <= 1)
+        return null;
+      TreeSet<MapFileInfo> candidateFiles = new TreeSet<MapFileInfo>(new Comparator<MapFileInfo>() {
+        @Override
+        public int compare(MapFileInfo o1, MapFileInfo o2) {
+          if (o1 == o2)
+            return 0;
+          if (o1.size < o2.size)
+            return -1;
+          if (o1.size > o2.size)
+            return 1;
+          return o1.path.compareTo(o2.path);
+        }
+      });
+      
+      double ratio = tableConf.getFraction(Property.TABLE_MAJC_RATIO);
+      int maxFilesToCompact = tableConf.getCount(Property.TSERV_MAJC_THREAD_MAXOPEN);
+      int maxFilesPerTablet = tableConf.getMaxFilesPerTablet();
+      
+      for (Entry<FileRef,DataFileValue> entry : tabletFiles.entrySet()) {
+        candidateFiles.add(new MapFileInfo(entry.getKey(), entry.getValue().getSize()));
+      }
+      
+      long totalSize = 0;
+      for (MapFileInfo mfi : candidateFiles) {
+        totalSize += mfi.size;
+      }
+      
+      Map<FileRef,Long> files = new HashMap<FileRef,Long>();
+      
+      while (candidateFiles.size() > 1) {
+        MapFileInfo max = candidateFiles.last();
+        if (max.size * ratio <= totalSize) {
+          files.clear();
+          for (MapFileInfo mfi : candidateFiles) {
+            files.put(mfi.path, mfi.size);
+            if (files.size() >= maxFilesToCompact)
+              break;
+          }
+          
+          break;
+        }
+        totalSize -= max.size;
+        candidateFiles.remove(max);
+      }
+      
+      int totalFilesToCompact = 0;
+      if (tabletFiles.size() > maxFilesPerTablet)
+        totalFilesToCompact = tabletFiles.size() - maxFilesPerTablet + 1;
+      
+      totalFilesToCompact = Math.min(totalFilesToCompact, maxFilesToCompact);
+      
+      if (files.size() < totalFilesToCompact) {
+        
+        TreeMap<FileRef,DataFileValue> tfc = new TreeMap<FileRef,DataFileValue>(tabletFiles);
+        tfc.keySet().removeAll(files.keySet());
+        
+        // put data in candidateFiles to sort it
+        candidateFiles.clear();
+        for (Entry<FileRef,DataFileValue> entry : tfc.entrySet())
+          candidateFiles.add(new MapFileInfo(entry.getKey(), entry.getValue().getSize()));
+        
+        for (MapFileInfo mfi : candidateFiles) {
+          files.put(mfi.path, mfi.size);
+          if (files.size() >= totalFilesToCompact)
+            break;
+        }
+      }
+      
+      if (files.size() == 0)
+        return null;
+      
+      return files;
+    }
+    
+    boolean needsMajorCompaction(SortedMap<FileRef,DataFileValue> tabletFiles, MajorCompactionReason reason) {
+      if (closed)
+        return false;// throw new IOException("closed");
+        
+      // int threshold;
+      
+      if (reason == MajorCompactionReason.USER)
+        return true;
+      
+      if (reason == MajorCompactionReason.IDLE) {
+        // threshold = 1;
+        long idleTime;
+        if (lastReportedCommitTime == 0) {
+          // no commits, so compute how long the tablet has been assigned to the
+          // tablet server
+          idleTime = System.currentTimeMillis() - creationTime;
+        } else {
+          idleTime = System.currentTimeMillis() - lastReportedCommitTime;
+        }
+        
+        if (idleTime < tableConf.getTimeInMillis(Property.TABLE_MAJC_COMPACTALL_IDLETIME)) {
+          return false;
+        }
+      }/*
+        * else{ threshold = tableConf.getCount(Property.TABLE_MAJC_THRESHOLD); }
+        */
+      
+      return findMapFilesToCompact(tabletFiles, reason) != null;
+    }
+    
+    // END methods that Tablets call to make decisions about major compaction
+    
+    // tablets call this method to run minor compactions,
+    // this allows us to control how many minor compactions
+    // run concurrently in a tablet server
+    void executeMinorCompaction(final Runnable r) {
+      minorCompactionThreadPool.execute(new LoggingRunnable(log, r));
+    }
+    
+    void close() throws IOException {
+      // always obtain locks in same order to avoid deadlock
+      synchronized (TabletServerResourceManager.this) {
+        synchronized (this) {
+          if (closed)
+            throw new IOException("closed");
+          if (openFilesReserved)
+            throw new IOException("tired to close files while open files reserved");
+          
+          TabletServerResourceManager.this.removeTabletResource(this);
+          
+          memMgmt.tabletClosed(tablet.getExtent());
+          memoryManager.tabletClosed(tablet.getExtent());
+          
+          closed = true;
+        }
+      }
+    }
+    
+    public TabletServerResourceManager getTabletServerResourceManager() {
+      return TabletServerResourceManager.this;
+    }
+    
+    public void executeMajorCompaction(KeyExtent tablet, Runnable compactionTask) {
+      TabletServerResourceManager.this.executeMajorCompaction(tablet, compactionTask);
+    }
+    
+  }
+  
+  public void executeSplit(KeyExtent tablet, Runnable splitTask) {
+    if (tablet.isMeta()) {
+      if (tablet.isRootTablet()) {
+        log.warn("Saw request to split root tablet, ignoring");
+        return;
+      }
+      defaultSplitThreadPool.execute(splitTask);
+    } else {
+      splitThreadPool.execute(splitTask);
+    }
+  }
+  
+  public void executeMajorCompaction(KeyExtent tablet, Runnable compactionTask) {
+    if (tablet.equals(RootTable.EXTENT)) {
+      rootMajorCompactionThreadPool.execute(compactionTask);
+    } else if (tablet.isMeta()) {
+      defaultMajorCompactionThreadPool.execute(compactionTask);
+    } else {
+      majorCompactionThreadPool.execute(compactionTask);
+    }
+  }
+  
+  public void executeReadAhead(KeyExtent tablet, Runnable task) {
+    if (tablet.isRootTablet()) {
+      task.run();
+    } else if (tablet.isMeta()) {
+      defaultReadAheadThreadPool.execute(task);
+    } else {
+      readAheadThreadPool.execute(task);
+    }
+  }
+  
+  public void addAssignment(Runnable assignmentHandler) {
+    assignmentPool.execute(assignmentHandler);
+  }
+  
+  public void addMetaDataAssignment(Runnable assignmentHandler) {
+    assignMetaDataPool.execute(assignmentHandler);
+  }
+  
+  public void addMigration(KeyExtent tablet, Runnable migrationHandler) {
+    if (tablet.isRootTablet()) {
+      migrationHandler.run();
+    } else if (tablet.isMeta()) {
+      defaultMigrationPool.execute(migrationHandler);
+    } else {
+      migrationPool.execute(migrationHandler);
+    }
+  }
+  
+  public void stopSplits() {
+    splitThreadPool.shutdown();
+    defaultSplitThreadPool.shutdown();
+    while (true) {
+      try {
+        while (!splitThreadPool.awaitTermination(1, TimeUnit.MINUTES)) {
+          log.info("Waiting for metadata split thread pool to stop");
+        }
+        while (!defaultSplitThreadPool.awaitTermination(1, TimeUnit.MINUTES)) {
+          log.info("Waiting for split thread pool to stop");
+        }
+        break;
+      } catch (InterruptedException ex) {
+        log.info(ex, ex);
+      }
+    }
+  }
+  
+  public void stopNormalAssignments() {
+    assignmentPool.shutdown();
+    while (true) {
+      try {
+        while (!assignmentPool.awaitTermination(1, TimeUnit.MINUTES)) {
+          log.info("Waiting for assignment thread pool to stop");
+        }
+        break;
+      } catch (InterruptedException ex) {
+        log.info(ex, ex);
+      }
+    }
+  }
+  
+  public void stopMetadataAssignments() {
+    assignMetaDataPool.shutdown();
+    while (true) {
+      try {
+        while (!assignMetaDataPool.awaitTermination(1, TimeUnit.MINUTES)) {
+          log.info("Waiting for metadata assignment thread pool to stop");
+        }
+        break;
+      } catch (InterruptedException ex) {
+        log.info(ex, ex);
+      }
+    }
+  }
+  
+  public LruBlockCache getIndexCache() {
+    return _iCache;
+  }
+  
+  public LruBlockCache getDataCache() {
+    return _dCache;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a10587ed/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletState.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletState.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletState.java
new file mode 100644
index 0000000..8ce8f7a
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletState.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver;
+
+import org.apache.accumulo.core.data.KeyExtent;
+
+public interface TabletState {
+  KeyExtent getExtent();
+  
+  long getLastCommitTime();
+  
+  long getMemTableSize();
+  
+  long getMinorCompactingMemTableSize();
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a10587ed/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletStatsKeeper.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletStatsKeeper.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletStatsKeeper.java
new file mode 100644
index 0000000..58e16be
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletStatsKeeper.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver;
+
+import org.apache.accumulo.core.tabletserver.thrift.ActionStats;
+import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
+import org.apache.accumulo.server.util.ActionStatsUpdator;
+
+public class TabletStatsKeeper {
+  
+  private ActionStats major = new ActionStats();
+  private ActionStats minor = new ActionStats();
+  private ActionStats split = new ActionStats();
+  
+  public enum Operation {
+    MAJOR, SPLIT, MINOR
+  }
+  
+  private ActionStats[] map = new ActionStats[] {major, split, minor};
+  
+  public void updateTime(Operation operation, long queued, long start, long count, boolean failed) {
+    try {
+      ActionStats data = map[operation.ordinal()];
+      if (failed) {
+        data.fail++;
+        data.status--;
+      } else {
+        double t = (System.currentTimeMillis() - start) / 1000.0;
+        double q = (start - queued) / 1000.0;
+        
+        data.status--;
+        data.count += count;
+        data.num++;
+        data.elapsed += t;
+        data.queueTime += q;
+        data.sumDev += t * t;
+        data.queueSumDev += q * q;
+        if (data.elapsed < 0 || data.sumDev < 0 || data.queueSumDev < 0 || data.queueTime < 0)
+          resetTimes();
+      }
+    } catch (Exception E) {
+      resetTimes();
+    }
+    
+  }
+  
+  public void updateTime(Operation operation, long start, long count, boolean failed) {
+    try {
+      ActionStats data = map[operation.ordinal()];
+      if (failed) {
+        data.fail++;
+        data.status--;
+      } else {
+        double t = (System.currentTimeMillis() - start) / 1000.0;
+        
+        data.status--;
+        data.num++;
+        data.elapsed += t;
+        data.sumDev += t * t;
+        
+        if (data.elapsed < 0 || data.sumDev < 0 || data.queueSumDev < 0 || data.queueTime < 0)
+          resetTimes();
+      }
+    } catch (Exception E) {
+      resetTimes();
+    }
+    
+  }
+  
+  public void saveMinorTimes(TabletStatsKeeper t) {
+    ActionStatsUpdator.update(minor, t.minor);
+  }
+  
+  public void saveMajorTimes(TabletStatsKeeper t) {
+    ActionStatsUpdator.update(major, t.major);
+  }
+  
+  public void resetTimes() {
+    major = new ActionStats();
+    split = new ActionStats();
+    minor = new ActionStats();
+  }
+  
+  public void incrementStatusMinor() {
+    minor.status++;
+  }
+  
+  public void incrementStatusMajor() {
+    major.status++;
+  }
+  
+  public void incrementStatusSplit() {
+    split.status++;
+  }
+  
+  public TabletStats getTabletStats() {
+    return new TabletStats(null, major, minor, split, 0, 0, 0, 0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a10587ed/server/tserver/src/main/java/org/apache/accumulo/tserver/TooManyFilesException.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TooManyFilesException.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TooManyFilesException.java
new file mode 100644
index 0000000..98c7a02
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TooManyFilesException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver;
+
+import java.io.IOException;
+
+public class TooManyFilesException extends IOException {
+  
+  private static final long serialVersionUID = 1L;
+  
+  public TooManyFilesException(String msg) {
+    super(msg);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a10587ed/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
new file mode 100644
index 0000000..116ce29
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.log;
+
+import static org.apache.accumulo.tserver.logger.LogEvents.COMPACTION_FINISH;
+import static org.apache.accumulo.tserver.logger.LogEvents.COMPACTION_START;
+import static org.apache.accumulo.tserver.logger.LogEvents.DEFINE_TABLET;
+import static org.apache.accumulo.tserver.logger.LogEvents.MANY_MUTATIONS;
+import static org.apache.accumulo.tserver.logger.LogEvents.OPEN;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.security.crypto.CryptoModuleFactory;
+import org.apache.accumulo.core.security.crypto.CryptoModuleParameters;
+import org.apache.accumulo.core.util.Daemon;
+import org.apache.accumulo.core.util.StringUtil;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.tserver.TabletMutations;
+import org.apache.accumulo.tserver.logger.LogFileKey;
+import org.apache.accumulo.tserver.logger.LogFileValue;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+/**
+ * Wrap a connection to a logger.
+ * 
+ */
+public class DfsLogger {
+  // Package private so that LogSorter can find this
+  static final String LOG_FILE_HEADER_V2 = "--- Log File Header (v2) ---";
+  
+  private static Logger log = Logger.getLogger(DfsLogger.class);
+  
+  public static class LogClosedException extends IOException {
+    private static final long serialVersionUID = 1L;
+    
+    public LogClosedException() {
+      super("LogClosed");
+    }
+  }
+  
+  public interface ServerResources {
+    AccumuloConfiguration getConfiguration();
+    
+    VolumeManager getFileSystem();
+    
+    Set<TServerInstance> getCurrentTServers();
+  }
+  
+  private final LinkedBlockingQueue<DfsLogger.LogWork> workQueue = new LinkedBlockingQueue<DfsLogger.LogWork>();
+  
+  private final Object closeLock = new Object();
+  
+  private static final DfsLogger.LogWork CLOSED_MARKER = new DfsLogger.LogWork(null, null);
+  
+  private static final LogFileValue EMPTY = new LogFileValue();
+  
+  private boolean closed = false;
+  
+  private class LogSyncingTask implements Runnable {
+    
+    @Override
+    public void run() {
+      ArrayList<DfsLogger.LogWork> work = new ArrayList<DfsLogger.LogWork>();
+      while (true) {
+        work.clear();
+        
+        try {
+          work.add(workQueue.take());
+        } catch (InterruptedException ex) {
+          continue;
+        }
+        workQueue.drainTo(work);
+        
+        synchronized (closeLock) {
+          if (!closed) {
+            try {
+              sync.invoke(logFile);
+            } catch (Exception ex) {
+              log.warn("Exception syncing " + ex);
+              for (DfsLogger.LogWork logWork : work) {
+                logWork.exception = ex;
+              }
+            }
+          } else {
+            for (DfsLogger.LogWork logWork : work) {
+              logWork.exception = new LogClosedException();
+            }
+          }
+        }
+        
+        boolean sawClosedMarker = false;
+        for (DfsLogger.LogWork logWork : work)
+          if (logWork == CLOSED_MARKER)
+            sawClosedMarker = true;
+          else
+            logWork.latch.countDown();
+        
+        if (sawClosedMarker) {
+          synchronized (closeLock) {
+            closeLock.notifyAll();
+          }
+          break;
+        }
+      }
+    }
+  }
+  
+  static class LogWork {
+    List<TabletMutations> mutations;
+    CountDownLatch latch;
+    volatile Exception exception;
+    
+    public LogWork(List<TabletMutations> mutations, CountDownLatch latch) {
+      this.mutations = mutations;
+      this.latch = latch;
+    }
+  }
+  
+  public static class LoggerOperation {
+    private final LogWork work;
+    
+    public LoggerOperation(LogWork work) {
+      this.work = work;
+    }
+    
+    public void await() throws IOException {
+      try {
+        work.latch.await();
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+      
+      if (work.exception != null) {
+        if (work.exception instanceof IOException)
+          throw (IOException) work.exception;
+        else if (work.exception instanceof RuntimeException)
+          throw (RuntimeException) work.exception;
+        else
+          throw new RuntimeException(work.exception);
+      }
+    }
+  }
+  
+  @Override
+  public boolean equals(Object obj) {
+    // filename is unique
+    if (obj == null)
+      return false;
+    if (obj instanceof DfsLogger)
+      return getFileName().equals(((DfsLogger) obj).getFileName());
+    return false;
+  }
+  
+  @Override
+  public int hashCode() {
+    // filename is unique
+    return getFileName().hashCode();
+  }
+  
+  private final ServerResources conf;
+  private FSDataOutputStream logFile;
+  private DataOutputStream encryptingLogFile = null;
+  private Method sync;
+  private Path logPath;
+  private String logger;
+  
+  public DfsLogger(ServerResources conf) throws IOException {
+    this.conf = conf;
+  }
+  
+  public DfsLogger(ServerResources conf, String logger, Path filename) throws IOException {
+    this.conf = conf;
+    this.logger = logger;
+    this.logPath = filename;
+  }
+  
+  public static FSDataInputStream readHeader(VolumeManager fs, Path path, Map<String,String> opts) throws IOException {
+    FSDataInputStream file = fs.open(path);
+    try {
+      byte[] magic = LOG_FILE_HEADER_V2.getBytes();
+      byte[] buffer = new byte[magic.length];
+      file.readFully(buffer);
+      if (Arrays.equals(buffer, magic)) {
+        int count = file.readInt();
+        for (int i = 0; i < count; i++) {
+          String key = file.readUTF();
+          String value = file.readUTF();
+          opts.put(key, value);
+        }
+      } else {
+        file.seek(0);
+        return file;
+      }
+      return file;
+    } catch (IOException ex) {
+      file.seek(0);
+      return file;
+    }
+  }
+  
+  public synchronized void open(String address) throws IOException {
+    String filename = UUID.randomUUID().toString();
+    logger = StringUtil.join(Arrays.asList(address.split(":")), "+");
+    
+    log.debug("DfsLogger.open() begin");
+    VolumeManager fs = conf.getFileSystem();
+    
+    logPath = new Path(fs.choose(ServerConstants.getWalDirs()) + "/" + logger + "/" + filename);
+    try {
+      short replication = (short) conf.getConfiguration().getCount(Property.TSERV_WAL_REPLICATION);
+      if (replication == 0)
+        replication = fs.getDefaultReplication(logPath);
+      long blockSize = conf.getConfiguration().getMemoryInBytes(Property.TSERV_WAL_BLOCKSIZE);
+      if (blockSize == 0)
+        blockSize = (long) (conf.getConfiguration().getMemoryInBytes(Property.TSERV_WALOG_MAX_SIZE) * 1.1);
+      if (conf.getConfiguration().getBoolean(Property.TSERV_WAL_SYNC))
+        logFile = fs.createSyncable(logPath, 0, replication, blockSize);
+      else
+        logFile = fs.create(logPath, true, 0, replication, blockSize);
+      
+      try {
+        NoSuchMethodException e = null;
+        try {
+          // sync: send data to datanodes
+          sync = logFile.getClass().getMethod("sync");
+        } catch (NoSuchMethodException ex) {
+          e = ex;
+        }
+        try {
+          // hsync: send data to datanodes and sync the data to disk
+          sync = logFile.getClass().getMethod("hsync");
+          e = null;
+        } catch (NoSuchMethodException ex) {}
+        if (e != null)
+          throw new RuntimeException(e);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+      
+      // Initialize the crypto operations.
+      org.apache.accumulo.core.security.crypto.CryptoModule cryptoModule = org.apache.accumulo.core.security.crypto.CryptoModuleFactory.getCryptoModule(conf
+          .getConfiguration().get(Property.CRYPTO_MODULE_CLASS));
+      
+      // Initialize the log file with a header and the crypto params used to set up this log file.
+      logFile.write(LOG_FILE_HEADER_V2.getBytes());
+
+      CryptoModuleParameters params = CryptoModuleFactory.createParamsObjectFromAccumuloConfiguration(conf.getConfiguration());
+      
+      params.setPlaintextOutputStream(logFile);
+      
+      // In order to bootstrap the reading of this file later, we have to record the CryptoModule that was used to encipher it here,
+      // so that that crypto module can re-read its own parameters.
+      
+      logFile.writeUTF(conf.getConfiguration().get(Property.CRYPTO_MODULE_CLASS));
+      
+      
+      //@SuppressWarnings("deprecation")
+      //OutputStream encipheringOutputStream = cryptoModule.getEncryptingOutputStream(logFile, cryptoOpts);
+      params = cryptoModule.getEncryptingOutputStream(params);
+      OutputStream encipheringOutputStream = params.getEncryptedOutputStream();
+      
+      // If the module just kicks back our original stream, then just use it, don't wrap it in
+      // another data OutputStream.
+      if (encipheringOutputStream == logFile) {
+        encryptingLogFile = logFile;
+      } else {
+        encryptingLogFile = new DataOutputStream(encipheringOutputStream);
+      }
+      
+      LogFileKey key = new LogFileKey();
+      key.event = OPEN;
+      key.tserverSession = filename;
+      key.filename = filename;
+      write(key, EMPTY);
+      sync.invoke(logFile);
+      log.debug("Got new write-ahead log: " + this);
+    } catch (Exception ex) {
+      if (logFile != null)
+        logFile.close();
+      logFile = null;
+      throw new IOException(ex);
+    }
+    
+    Thread t = new Daemon(new LogSyncingTask());
+    t.setName("Accumulo WALog thread " + toString());
+    t.start();
+  }
+  
+  @Override
+  public String toString() {
+    return getLogger() + "/" + getFileName();
+  }
+  
+  public String getLogger() {
+    return logger;
+  }
+  
+  public String getFileName() {
+    return logPath.toString();
+  }
+  
+  public void close() throws IOException {
+    
+    synchronized (closeLock) {
+      if (closed)
+        return;
+      // after closed is set to true, nothing else should be added to the queue
+      // CLOSED_MARKER should be the last thing on the queue, therefore when the
+      // background thread sees the marker and exits there should be nothing else
+      // to process... so nothing should be left waiting for the background
+      // thread to do work
+      closed = true;
+      workQueue.add(CLOSED_MARKER);
+      while (!workQueue.isEmpty())
+        try {
+          closeLock.wait();
+        } catch (InterruptedException e) {
+          log.info("Interrupted");
+        }
+    }
+    
+    if (logFile != null)
+      try {
+        logFile.close();
+      } catch (IOException ex) {
+        log.error(ex);
+        throw new LogClosedException();
+      }
+  }
+  
+  public synchronized void defineTablet(int seq, int tid, KeyExtent tablet) throws IOException {
+    // write this log to the METADATA table
+    final LogFileKey key = new LogFileKey();
+    key.event = DEFINE_TABLET;
+    key.seq = seq;
+    key.tid = tid;
+    key.tablet = tablet;
+    try {
+      write(key, EMPTY);
+      sync.invoke(logFile);
+    } catch (Exception ex) {
+      log.error(ex);
+      throw new IOException(ex);
+    }
+  }
+  
+  /**
+   * @param key
+   * @param empty2
+   * @throws IOException
+   */
+  private synchronized void write(LogFileKey key, LogFileValue value) throws IOException {
+    key.write(encryptingLogFile);
+    value.write(encryptingLogFile);
+  }
+  
+  public LoggerOperation log(int seq, int tid, Mutation mutation) throws IOException {
+    return logManyTablets(Collections.singletonList(new TabletMutations(tid, seq, Collections.singletonList(mutation))));
+  }
+  
+  public LoggerOperation logManyTablets(List<TabletMutations> mutations) throws IOException {
+    DfsLogger.LogWork work = new DfsLogger.LogWork(mutations, new CountDownLatch(1));
+    
+    synchronized (DfsLogger.this) {
+      try {
+        for (TabletMutations tabletMutations : mutations) {
+          LogFileKey key = new LogFileKey();
+          key.event = MANY_MUTATIONS;
+          key.seq = tabletMutations.getSeq();
+          key.tid = tabletMutations.getTid();
+          LogFileValue value = new LogFileValue();
+          value.mutations = tabletMutations.getMutations();
+          write(key, value);
+        }
+      } catch (Exception e) {
+        log.error(e, e);
+        work.exception = e;
+      }
+    }
+    
+    synchronized (closeLock) {
+      // use a different lock for close check so that adding to work queue does not need
+      // to wait on walog I/O operations
+      
+      if (closed)
+        throw new LogClosedException();
+      workQueue.add(work);
+    }
+    
+    return new LoggerOperation(work);
+  }
+  
+  public synchronized void minorCompactionFinished(int seq, int tid, String fqfn) throws IOException {
+    LogFileKey key = new LogFileKey();
+    key.event = COMPACTION_FINISH;
+    key.seq = seq;
+    key.tid = tid;
+    try {
+      write(key, EMPTY);
+    } catch (IOException ex) {
+      log.error(ex);
+      throw ex;
+    }
+  }
+  
+  public synchronized void minorCompactionStarted(int seq, int tid, String fqfn) throws IOException {
+    LogFileKey key = new LogFileKey();
+    key.event = COMPACTION_START;
+    key.seq = seq;
+    key.tid = tid;
+    key.filename = fqfn;
+    try {
+      write(key, EMPTY);
+    } catch (IOException ex) {
+      log.error(ex);
+      throw ex;
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a10587ed/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
new file mode 100644
index 0000000..a0dc39d
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.log;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.master.thrift.RecoveryStatus;
+import org.apache.accumulo.core.security.crypto.CryptoModuleFactory;
+import org.apache.accumulo.core.security.crypto.CryptoModuleParameters;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.util.SimpleThreadPool;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
+import org.apache.accumulo.server.zookeeper.DistributedWorkQueue.Processor;
+import org.apache.accumulo.tserver.logger.LogFileKey;
+import org.apache.accumulo.tserver.logger.LogFileValue;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.MapFile;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * 
+ */
+public class LogSorter {
+  
+  private static final Logger log = Logger.getLogger(LogSorter.class);
+  VolumeManager fs;
+  AccumuloConfiguration conf;
+  
+  private final Map<String,LogProcessor> currentWork = Collections.synchronizedMap(new HashMap<String,LogProcessor>());
+  
+  class LogProcessor implements Processor {
+    
+    private FSDataInputStream input;
+    private DataInputStream decryptingInput;
+    private long bytesCopied = -1;
+    private long sortStart = 0;
+    private long sortStop = -1;
+    
+    @Override
+    public Processor newProcessor() {
+      return new LogProcessor();
+    }
+    
+    @Override
+    public void process(String child, byte[] data) {
+      String work = new String(data);
+      String[] parts = work.split("\\|");
+      String src = parts[0];
+      String dest = parts[1];
+      String sortId = new Path(src).getName();
+      log.debug("Sorting " + src + " to " + dest + " using sortId " + sortId);
+      
+      synchronized (currentWork) {
+        if (currentWork.containsKey(sortId))
+          return;
+        currentWork.put(sortId, this);
+      }
+      
+      try {
+        log.info("Copying " + src + " to " + dest);
+        sort(sortId, new Path(src), dest);
+      } finally {
+        currentWork.remove(sortId);
+      }
+      
+    }
+    
+    public void sort(String name, Path srcPath, String destPath) {
+      
+      synchronized (this) {
+        sortStart = System.currentTimeMillis();
+      }
+      
+      String formerThreadName = Thread.currentThread().getName();
+      int part = 0;
+      try {
+        
+        // the following call does not throw an exception if the file/dir does not exist
+        fs.deleteRecursively(new Path(destPath));
+        
+        FSDataInputStream tmpInput = fs.open(srcPath);
+                
+        byte[] magic = DfsLogger.LOG_FILE_HEADER_V2.getBytes();
+        byte[] magicBuffer = new byte[magic.length];
+        tmpInput.readFully(magicBuffer);
+        if (!Arrays.equals(magicBuffer, magic)) {
+          tmpInput.seek(0);
+          synchronized (this) {
+           this.input = tmpInput;
+           this.decryptingInput = tmpInput;
+          }
+        } else {
+          // We read the crypto module class name here because we need to boot strap the class.  The class itself will read any 
+          // additional parameters it needs from the underlying stream.
+          String cryptoModuleClassname = tmpInput.readUTF();
+          org.apache.accumulo.core.security.crypto.CryptoModule cryptoModule = org.apache.accumulo.core.security.crypto.CryptoModuleFactory
+              .getCryptoModule(cryptoModuleClassname);
+          
+          // Create the parameters and set the input stream into those parameters
+          CryptoModuleParameters params = CryptoModuleFactory.createParamsObjectFromAccumuloConfiguration(conf);
+          params.setEncryptedInputStream(tmpInput);
+          
+          // Create the plaintext input stream from the encrypted one
+          params = cryptoModule.getDecryptingInputStream(params);
+          
+          // Store the plaintext input stream into member variables
+          synchronized (this) {
+            this.input = tmpInput;
+            
+            if (params.getPlaintextInputStream() instanceof DataInputStream) {
+              this.decryptingInput = (DataInputStream)params.getPlaintextInputStream();              
+            } else {
+              this.decryptingInput = new DataInputStream(params.getPlaintextInputStream());
+            }
+            
+          }
+          
+        }
+                
+        
+        final long bufferSize = conf.getMemoryInBytes(Property.TSERV_SORT_BUFFER_SIZE);
+        Thread.currentThread().setName("Sorting " + name + " for recovery");
+        while (true) {
+          final ArrayList<Pair<LogFileKey,LogFileValue>> buffer = new ArrayList<Pair<LogFileKey,LogFileValue>>();
+          try {
+            long start = input.getPos();
+            while (input.getPos() - start < bufferSize) {
+              LogFileKey key = new LogFileKey();
+              LogFileValue value = new LogFileValue();
+              key.readFields(decryptingInput);
+              value.readFields(decryptingInput);
+              buffer.add(new Pair<LogFileKey,LogFileValue>(key, value));
+            }
+            writeBuffer(destPath, buffer, part++);
+            buffer.clear();
+          } catch (EOFException ex) {
+            writeBuffer(destPath, buffer, part++);
+            break;
+          }
+        }
+        fs.create(new Path(destPath, "finished")).close();
+        log.info("Finished log sort " + name + " " + getBytesCopied() + " bytes " + part + " parts in " + getSortTime() + "ms");
+      } catch (Throwable t) {
+        try {
+          // parent dir may not exist
+          fs.mkdirs(new Path(destPath));
+          fs.create(new Path(destPath, "failed")).close();
+        } catch (IOException e) {
+          log.error("Error creating failed flag file " + name, e);
+        }
+        log.error(t, t);
+      } finally {
+        Thread.currentThread().setName(formerThreadName);
+        try {
+          close();
+        } catch (Exception e) {
+          log.error("Error during cleanup sort/copy " + name, e);
+        }
+        synchronized (this) {
+          sortStop = System.currentTimeMillis();
+        }
+      }
+    }
+    
+    private void writeBuffer(String destPath, ArrayList<Pair<LogFileKey,LogFileValue>> buffer, int part) throws IOException {
+      Path path = new Path(destPath, String.format("part-r-%05d", part++));
+      FileSystem ns = fs.getFileSystemByPath(path);
+      MapFile.Writer output = new MapFile.Writer(ns.getConf(), ns, path.toString(), LogFileKey.class, LogFileValue.class);
+      try {
+        Collections.sort(buffer, new Comparator<Pair<LogFileKey,LogFileValue>>() {
+          @Override
+          public int compare(Pair<LogFileKey,LogFileValue> o1, Pair<LogFileKey,LogFileValue> o2) {
+            return o1.getFirst().compareTo(o2.getFirst());
+          }
+        });
+        for (Pair<LogFileKey,LogFileValue> entry : buffer) {
+          output.append(entry.getFirst(), entry.getSecond());
+        }
+      } finally {
+        output.close();
+      }
+    }
+    
+    synchronized void close() throws IOException {
+      bytesCopied = input.getPos();
+      input.close();
+      decryptingInput.close();
+      input = null;
+    }
+    
+    public synchronized long getSortTime() {
+      if (sortStart > 0) {
+        if (sortStop > 0)
+          return sortStop - sortStart;
+        return System.currentTimeMillis() - sortStart;
+      }
+      return 0;
+    }
+    
+    synchronized long getBytesCopied() throws IOException {
+      return input == null ? bytesCopied : input.getPos();
+    }
+  }
+  
+  ThreadPoolExecutor threadPool;
+  private final Instance instance;
+  
+  public LogSorter(Instance instance, VolumeManager fs, AccumuloConfiguration conf) {
+    this.instance = instance;
+    this.fs = fs;
+    this.conf = conf;
+    int threadPoolSize = conf.getCount(Property.TSERV_RECOVERY_MAX_CONCURRENT);
+    this.threadPool = new SimpleThreadPool(threadPoolSize, this.getClass().getName());
+  }
+  
+  public void startWatchingForRecoveryLogs(ThreadPoolExecutor distWorkQThreadPool) throws KeeperException, InterruptedException {
+    this.threadPool = distWorkQThreadPool;
+    new DistributedWorkQueue(ZooUtil.getRoot(instance) + Constants.ZRECOVERY).startProcessing(new LogProcessor(), this.threadPool);
+  }
+  
+  public List<RecoveryStatus> getLogSorts() {
+    List<RecoveryStatus> result = new ArrayList<RecoveryStatus>();
+    synchronized (currentWork) {
+      for (Entry<String,LogProcessor> entries : currentWork.entrySet()) {
+        RecoveryStatus status = new RecoveryStatus();
+        status.name = entries.getKey();
+        try {
+          status.progress = entries.getValue().getBytesCopied() / (0.0 + conf.getMemoryInBytes(Property.TSERV_WALOG_MAX_SIZE));
+        } catch (IOException ex) {
+          log.warn("Error getting bytes read");
+        }
+        status.runtime = (int) entries.getValue().getSortTime();
+        result.add(status);
+      }
+      return result;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a10587ed/server/tserver/src/main/java/org/apache/accumulo/tserver/log/MultiReader.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/MultiReader.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/MultiReader.java
new file mode 100644
index 0000000..f82322a
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/MultiReader.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.log;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.commons.collections.buffer.PriorityBuffer;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.MapFile.Reader;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Provide simple Map.Reader methods over multiple Maps.
+ * 
+ * Presently only supports next() and seek() and works on all the Map directories within a directory. The primary purpose of this class is to merge the results
+ * of multiple Reduce jobs that result in Map output files.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class MultiReader {
+  
+  /**
+   * Group together the next key/value from a Reader with the Reader
+   * 
+   */
+  private static class Index implements Comparable<Index> {
+    Reader reader;
+    WritableComparable key;
+    Writable value;
+    boolean cached = false;
+    
+    private static Object create(java.lang.Class<?> klass) {
+      try {
+        return klass.getConstructor().newInstance();
+      } catch (Throwable t) {
+        throw new RuntimeException("Unable to construct objects to use for comparison");
+      }
+    }
+    
+    public Index(Reader reader) {
+      this.reader = reader;
+      key = (WritableComparable) create(reader.getKeyClass());
+      value = (Writable) create(reader.getValueClass());
+    }
+    
+    private void cache() throws IOException {
+      if (!cached && reader.next(key, value)) {
+        cached = true;
+      }
+    }
+    
+    public int compareTo(Index o) {
+      try {
+        cache();
+        o.cache();
+        // no more data: always goes to the end
+        if (!cached)
+          return 1;
+        if (!o.cached)
+          return -1;
+        return key.compareTo(o.key);
+      } catch (IOException ex) {
+        throw new RuntimeException(ex);
+      }
+    }
+  }
+  
+  private PriorityBuffer heap = new PriorityBuffer();
+  
+  public MultiReader(VolumeManager fs, Path directory) throws IOException {
+    boolean foundFinish = false;
+    for (FileStatus child : fs.listStatus(directory)) {
+      if (child.getPath().getName().startsWith("_"))
+        continue;
+      if (child.getPath().getName().equals("finished")) {
+        foundFinish = true;
+        continue;
+      }
+      FileSystem ns = fs.getFileSystemByPath(child.getPath());
+      heap.add(new Index(new Reader(ns, child.getPath().toString(), ns.getConf())));
+    }
+    if (!foundFinish)
+      throw new IOException("Sort \"finished\" flag not found in " + directory);
+  }
+  
+  private static void copy(Writable src, Writable dest) throws IOException {
+    // not exactly efficient...
+    DataOutputBuffer output = new DataOutputBuffer();
+    src.write(output);
+    DataInputBuffer input = new DataInputBuffer();
+    input.reset(output.getData(), output.getLength());
+    dest.readFields(input);
+  }
+  
+  public synchronized boolean next(WritableComparable key, Writable val) throws IOException {
+    Index elt = (Index) heap.remove();
+    try {
+      elt.cache();
+      if (elt.cached) {
+        copy(elt.key, key);
+        copy(elt.value, val);
+        elt.cached = false;
+      } else {
+        return false;
+      }
+    } finally {
+      heap.add(elt);
+    }
+    return true;
+  }
+  
+  public synchronized boolean seek(WritableComparable key) throws IOException {
+    PriorityBuffer reheap = new PriorityBuffer(heap.size());
+    boolean result = false;
+    for (Object obj : heap) {
+      Index index = (Index) obj;
+      try {
+        WritableComparable found = index.reader.getClosest(key, index.value, true);
+        if (found != null && found.equals(key)) {
+          result = true;
+        }
+      } catch (EOFException ex) {
+        // thrown if key is beyond all data in the map
+      }
+      index.cached = false;
+      reheap.add(index);
+    }
+    heap = reheap;
+    return result;
+  }
+  
+  public void close() throws IOException {
+    IOException problem = null;
+    for (Object obj : heap) {
+      Index index = (Index) obj;
+      try {
+        index.reader.close();
+      } catch (IOException ex) {
+        problem = ex;
+      }
+    }
+    if (problem != null)
+      throw problem;
+    heap = null;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a10587ed/server/tserver/src/main/java/org/apache/accumulo/tserver/log/MutationReceiver.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/MutationReceiver.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/MutationReceiver.java
new file mode 100644
index 0000000..46e911a
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/MutationReceiver.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.log;
+
+import org.apache.accumulo.core.data.Mutation;
+
+public interface MutationReceiver {
+  void receive(Mutation m);
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a10587ed/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java
new file mode 100644
index 0000000..3f2d284
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.log;
+
+import static org.apache.accumulo.tserver.logger.LogEvents.COMPACTION_FINISH;
+import static org.apache.accumulo.tserver.logger.LogEvents.COMPACTION_START;
+import static org.apache.accumulo.tserver.logger.LogEvents.DEFINE_TABLET;
+import static org.apache.accumulo.tserver.logger.LogEvents.MANY_MUTATIONS;
+import static org.apache.accumulo.tserver.logger.LogEvents.MUTATION;
+import static org.apache.accumulo.tserver.logger.LogEvents.OPEN;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.tserver.logger.LogFileKey;
+import org.apache.accumulo.tserver.logger.LogFileValue;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+/**
+ * Extract Mutations for a tablet from a set of logs that have been sorted by operation and tablet.
+ * 
+ */
+public class SortedLogRecovery {
+  private static final Logger log = Logger.getLogger(SortedLogRecovery.class);
+  
+  static class EmptyMapFileException extends Exception {
+    private static final long serialVersionUID = 1L;
+
+    public EmptyMapFileException() { super(); }
+  }
+
+  static class UnusedException extends Exception {
+    private static final long serialVersionUID = 1L;
+
+    public UnusedException() { super(); }
+  }
+
+  private VolumeManager fs;
+
+  public SortedLogRecovery(VolumeManager fs) {
+    this.fs = fs;
+  }
+  
+  private enum Status {
+    INITIAL, LOOKING_FOR_FINISH, COMPLETE
+  };
+  
+  private static class LastStartToFinish {
+    long lastStart = -1;
+    long seq = -1;
+    long lastFinish = -1;
+    Status compactionStatus = Status.INITIAL;
+    String tserverSession = "";
+    
+    private void update(long newFinish) {
+      this.seq = this.lastStart;
+      if (newFinish != -1)
+        lastFinish = newFinish;
+    }
+    
+    private void update(int newStartFile, long newStart) {
+      this.lastStart = newStart;
+    }
+    
+    private void update(String newSession) {
+      this.lastStart = -1;
+      this.lastFinish = -1;
+      this.compactionStatus = Status.INITIAL;
+      this.tserverSession = newSession;
+    }
+  }
+  
+  public void recover(KeyExtent extent, List<Path> recoveryLogs, Set<String> tabletFiles, MutationReceiver mr) throws IOException {
+    int[] tids = new int[recoveryLogs.size()];
+    LastStartToFinish lastStartToFinish = new LastStartToFinish();
+    for (int i = 0; i < recoveryLogs.size(); i++) {
+      Path logfile = recoveryLogs.get(i);
+      log.info("Looking at mutations from " + logfile + " for " + extent);
+      MultiReader reader = new MultiReader(fs, logfile);
+      try {
+        try {
+          tids[i] = findLastStartToFinish(reader, i, extent, tabletFiles, lastStartToFinish);
+        } catch (EmptyMapFileException ex) {
+          log.info("Ignoring empty map file " + logfile);
+          tids[i] = -1;
+        } catch (UnusedException ex) {
+          log.info("Ignoring log file " + logfile + " appears to be unused by " + extent);
+          tids[i] = -1;
+        }
+      } finally {
+        try {
+          reader.close();
+        } catch (IOException ex) {
+          log.warn("Ignoring error closing file");
+        }
+      }
+      
+    }
+    
+    if (lastStartToFinish.compactionStatus == Status.LOOKING_FOR_FINISH)
+      throw new RuntimeException("COMPACTION_FINISH (without preceding COMPACTION_START) not followed by successful minor compaction");
+    
+    for (int i = 0; i < recoveryLogs.size(); i++) {
+      Path logfile = recoveryLogs.get(i);
+      MultiReader reader = new MultiReader(fs, logfile);
+      try {
+        playbackMutations(reader, tids[i], lastStartToFinish, mr);
+      } finally {
+        try {
+          reader.close();
+        } catch (IOException ex) {
+          log.warn("Ignoring error closing file");
+        }
+      }
+      log.info("Recovery complete for " + extent + " using " + logfile);
+    }
+  }
+  
+  int findLastStartToFinish(MultiReader reader, int fileno, KeyExtent extent, Set<String> tabletFiles, LastStartToFinish lastStartToFinish) throws IOException, EmptyMapFileException, UnusedException {
+    // Scan for tableId for this extent (should always be in the log)
+    LogFileKey key = new LogFileKey();
+    LogFileValue value = new LogFileValue();
+    int tid = -1;
+    if (!reader.next(key, value))
+      throw new EmptyMapFileException();
+    if (key.event != OPEN)
+      throw new RuntimeException("First log entry value is not OPEN");
+    
+    if (key.tserverSession.compareTo(lastStartToFinish.tserverSession) != 0) {
+      if (lastStartToFinish.compactionStatus == Status.LOOKING_FOR_FINISH)
+        throw new RuntimeException("COMPACTION_FINISH (without preceding COMPACTION_START) is not followed by a successful minor compaction.");
+      lastStartToFinish.update(key.tserverSession);
+    }
+    
+    LogFileKey defineKey = null;
+    
+    // find the maximum tablet id... because a tablet may leave a tserver and then come back, in which case it would have a different tablet id
+    // for the maximum tablet id, find the minimum sequence #... may be ok to find the max seq, but just want to make the code behave like it used to
+    while (reader.next(key, value)) {
+      // LogReader.printEntry(entry);
+      if (key.event != DEFINE_TABLET)
+        break;
+      if (key.tablet.equals(extent)) {
+        if (tid != key.tid) {
+          tid = key.tid;
+          defineKey = key;
+          key = new LogFileKey();
+        }
+      }
+    }
+    if (tid < 0) {
+      throw new UnusedException();
+    }
+    
+    log.debug("Found tid, seq " + tid + " " + defineKey.seq);
+    
+    // Scan start/stop events for this tablet
+    key = defineKey;
+    key.event = COMPACTION_START;
+    reader.seek(key);
+    while (reader.next(key, value)) {
+      // LogFileEntry.printEntry(entry);
+      if (key.tid != tid)
+        break;
+      if (key.event == COMPACTION_START) {
+        if (lastStartToFinish.compactionStatus == Status.INITIAL)
+          lastStartToFinish.compactionStatus = Status.COMPLETE;
+        if (key.seq <= lastStartToFinish.lastStart)
+          throw new RuntimeException("Sequence numbers are not increasing for start/stop events.");
+        lastStartToFinish.update(fileno, key.seq);
+        
+        // Tablet server finished the minor compaction, but didn't remove the entry from the METADATA table.
+        log.error("filename in compaction start " + key.filename);
+        if (tabletFiles.contains(key.filename))
+          lastStartToFinish.update(-1);
+      } else if (key.event == COMPACTION_FINISH) {
+        if (key.seq <= lastStartToFinish.lastStart)
+          throw new RuntimeException("Sequence numbers are not increasing for start/stop events.");
+        if (lastStartToFinish.compactionStatus == Status.INITIAL)
+          lastStartToFinish.compactionStatus = Status.LOOKING_FOR_FINISH;
+        else if (lastStartToFinish.lastFinish > lastStartToFinish.lastStart)
+          throw new RuntimeException("COMPACTION_FINISH does not have preceding COMPACTION_START event.");
+        else
+          lastStartToFinish.compactionStatus = Status.COMPLETE;
+        lastStartToFinish.update(key.seq);
+      } else
+        break;
+    }
+    return tid;
+  }
+  
+  private void playbackMutations(MultiReader reader, int tid, LastStartToFinish lastStartToFinish, MutationReceiver mr) throws IOException {
+    LogFileKey key = new LogFileKey();
+    LogFileValue value = new LogFileValue();
+    
+    // Playback mutations after the last stop to finish
+    log.info("Scanning for mutations starting at sequence number " + lastStartToFinish.seq + " for tid " + tid);
+    key.event = MUTATION;
+    key.tid = tid;
+    // the seq number for the minor compaction start is now the same as the
+    // last update made to memory. Scan up to that mutation, but not past it.
+    key.seq = lastStartToFinish.seq;
+    reader.seek(key);
+    while (true) {
+      if (!reader.next(key, value))
+        break;
+      if (key.tid != tid)
+        break;
+      // log.info("Replaying " + key);
+      // log.info(value);
+      if (key.event == MUTATION) {
+        mr.receive(value.mutations.get(0));
+      } else if (key.event == MANY_MUTATIONS) {
+        for (Mutation m : value.mutations) {
+          mr.receive(m);
+        }
+      } else {
+        throw new RuntimeException("unexpected log key type: " + key.event);
+      }
+    }
+  }
+}


Mime
View raw message