accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctubb...@apache.org
Subject [41/53] [abbrv] ACCUMULO-658 consistent package names to avoid overlapped sealed jars
Date Fri, 06 Sep 2013 18:23:09 GMT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c3ddf9b6/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerConstants.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerConstants.java b/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerConstants.java
deleted file mode 100644
index 2c6858d..0000000
--- a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerConstants.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver;
-
-public class TabletServerConstants {
-  
-  public static final String IntermediateKeyName = "IntermediateKey";
-  public static final String ColumnSetName = "ColumnSet";
-  public static final String AuthorizationSetName = "AuthorizationSet";
-  public static final String EndKeyName = "EndKey";
-  public static final String MaxResultsName = "MaxResults";
-  public static final String PreviousQueryTypeName = "PreviousQueryType";
-  public static final String PreviousQueryStatusName = "PreviousQueryStatus";
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c3ddf9b6/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java b/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java
deleted file mode 100644
index 78313e7..0000000
--- a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java
+++ /dev/null
@@ -1,804 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.accumulo.trace.instrument.TraceExecutorService;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.KeyExtent;
-import org.apache.accumulo.core.file.blockfile.cache.LruBlockCache;
-import org.apache.accumulo.core.metadata.RootTable;
-import org.apache.accumulo.core.metadata.schema.DataFileValue;
-import org.apache.accumulo.core.util.Daemon;
-import org.apache.accumulo.core.util.LoggingRunnable;
-import org.apache.accumulo.core.util.NamingThreadFactory;
-import org.apache.accumulo.core.util.UtilWaitThread;
-import org.apache.accumulo.server.conf.ServerConfiguration;
-import org.apache.accumulo.server.fs.FileRef;
-import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.accumulo.server.tabletserver.FileManager.ScanFileManager;
-import org.apache.accumulo.server.tabletserver.Tablet.MajorCompactionReason;
-import org.apache.accumulo.server.tabletserver.Tablet.MinorCompactionReason;
-import org.apache.accumulo.server.util.time.SimpleTimer;
-import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
-import org.apache.log4j.Logger;
-
-/**
- * ResourceManager is responsible for managing the resources of all tablets within a tablet server.
- * 
- * 
- * 
- */
-public class TabletServerResourceManager {
-  
-  private ExecutorService minorCompactionThreadPool;
-  private ExecutorService majorCompactionThreadPool;
-  private ExecutorService rootMajorCompactionThreadPool;
-  private ExecutorService defaultMajorCompactionThreadPool;
-  private ExecutorService splitThreadPool;
-  private ExecutorService defaultSplitThreadPool;
-  private ExecutorService defaultMigrationPool;
-  private ExecutorService migrationPool;
-  private ExecutorService assignmentPool;
-  private ExecutorService assignMetaDataPool;
-  private ExecutorService readAheadThreadPool;
-  private ExecutorService defaultReadAheadThreadPool;
-  private Map<String,ExecutorService> threadPools = new TreeMap<String,ExecutorService>();
-  
-  private HashSet<TabletResourceManager> tabletResources;
-  
-  private FileManager fileManager;
-  
-  private MemoryManager memoryManager;
-  
-  private MemoryManagementFramework memMgmt;
-  
-  private final LruBlockCache _dCache;
-  private final LruBlockCache _iCache;
-  private final ServerConfiguration conf;
-  
-  private static final Logger log = Logger.getLogger(TabletServerResourceManager.class);
-  
-  private ExecutorService addEs(String name, ExecutorService tp) {
-    if (threadPools.containsKey(name)) {
-      throw new IllegalArgumentException("Cannot create two executor services with same name " + name);
-    }
-    tp = new TraceExecutorService(tp);
-    threadPools.put(name, tp);
-    return tp;
-  }
-  
-  private ExecutorService addEs(final Property maxThreads, String name, final ThreadPoolExecutor tp) {
-    ExecutorService result = addEs(name, tp);
-    SimpleTimer.getInstance().schedule(new Runnable() {
-      @Override
-      public void run() {
-        try {
-          int max = conf.getConfiguration().getCount(maxThreads);
-          if (tp.getMaximumPoolSize() != max) {
-            log.info("Changing " + maxThreads.getKey() + " to " + max);
-            tp.setCorePoolSize(max);
-            tp.setMaximumPoolSize(max);
-          }
-        } catch (Throwable t) {
-          log.error(t, t);
-        }
-      }
-      
-    }, 1000, 10 * 1000);
-    return result;
-  }
-
-  private ExecutorService createEs(int max, String name) {
-    return addEs(name, Executors.newFixedThreadPool(max, new NamingThreadFactory(name)));
-  }
-  
-  private ExecutorService createEs(Property max, String name) {
-    return createEs(max, name, new LinkedBlockingQueue<Runnable>());
-  }
-
-  private ExecutorService createEs(Property max, String name, BlockingQueue<Runnable> queue) {
-    int maxThreads = conf.getConfiguration().getCount(max);
-    ThreadPoolExecutor tp = new ThreadPoolExecutor(maxThreads, maxThreads, 0L, TimeUnit.MILLISECONDS, queue, new NamingThreadFactory(name));
-    return addEs(max, name, tp);
-  }
-
-  private ExecutorService createEs(int min, int max, int timeout, String name) {
-    return addEs(name, new ThreadPoolExecutor(min, max, timeout, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new NamingThreadFactory(name)));
-  }
-  
-  public TabletServerResourceManager(Instance instance, VolumeManager fs) {
-    this.conf = new ServerConfiguration(instance);
-    final AccumuloConfiguration acuConf = conf.getConfiguration();
-    
-    long maxMemory = acuConf.getMemoryInBytes(Property.TSERV_MAXMEM);
-    boolean usingNativeMap = acuConf.getBoolean(Property.TSERV_NATIVEMAP_ENABLED) && NativeMap.loadedNativeLibraries();
-    
-    long blockSize = acuConf.getMemoryInBytes(Property.TSERV_DEFAULT_BLOCKSIZE);
-    long dCacheSize = acuConf.getMemoryInBytes(Property.TSERV_DATACACHE_SIZE);
-    long iCacheSize = acuConf.getMemoryInBytes(Property.TSERV_INDEXCACHE_SIZE);
-    
-    _iCache = new LruBlockCache(iCacheSize, blockSize);
-    _dCache = new LruBlockCache(dCacheSize, blockSize);
-    
-    Runtime runtime = Runtime.getRuntime();
-    if (!usingNativeMap && maxMemory + dCacheSize + iCacheSize > runtime.maxMemory()) {
-      throw new IllegalArgumentException(String.format(
-          "Maximum tablet server map memory %,d and block cache sizes %,d is too large for this JVM configuration %,d", maxMemory, dCacheSize + iCacheSize,
-          runtime.maxMemory()));
-    }
-    runtime.gc();
-
-    // totalMemory - freeMemory = memory in use
-    // maxMemory - memory in use = max available memory
-    if (!usingNativeMap && maxMemory > runtime.maxMemory() - (runtime.totalMemory() - runtime.freeMemory())) {
-      log.warn("In-memory map may not fit into local memory space.");
-    }
-    
-    minorCompactionThreadPool = createEs(Property.TSERV_MINC_MAXCONCURRENT, "minor compactor");
-    
-    // make this thread pool have a priority queue... and execute tablets with the most
-    // files first!
-    majorCompactionThreadPool = createEs(Property.TSERV_MAJC_MAXCONCURRENT, "major compactor", new CompactionQueue());
-    rootMajorCompactionThreadPool = createEs(0, 1, 300, "md root major compactor");
-    defaultMajorCompactionThreadPool = createEs(0, 1, 300, "md major compactor");
-    
-    splitThreadPool = createEs(1, "splitter");
-    defaultSplitThreadPool = createEs(0, 1, 60, "md splitter");
-    
-    defaultMigrationPool = createEs(0, 1, 60, "metadata tablet migration");
-    migrationPool = createEs(Property.TSERV_MIGRATE_MAXCONCURRENT, "tablet migration");
-    
-    // not sure if concurrent assignments can run safely... even if they could there is probably no benefit at startup because
-    // individual tablet servers are already running assignments concurrently... having each individual tablet server run
-    // concurrent assignments would put more load on the metadata table at startup
-    assignmentPool = createEs(1, "tablet assignment");
-    
-    assignMetaDataPool = createEs(0, 1, 60, "metadata tablet assignment");
-    
-    readAheadThreadPool = createEs(Property.TSERV_READ_AHEAD_MAXCONCURRENT, "tablet read ahead");
-    defaultReadAheadThreadPool = createEs(Property.TSERV_METADATA_READ_AHEAD_MAXCONCURRENT, "metadata tablets read ahead");
-    
-    tabletResources = new HashSet<TabletResourceManager>();
-    
-    int maxOpenFiles = acuConf.getCount(Property.TSERV_SCAN_MAX_OPENFILES);
-    
-    fileManager = new FileManager(conf, fs, maxOpenFiles, _dCache, _iCache);
-    
-    try {
-      Class<? extends MemoryManager> clazz = AccumuloVFSClassLoader.loadClass(acuConf.get(Property.TSERV_MEM_MGMT), MemoryManager.class);
-      memoryManager = clazz.newInstance();
-      memoryManager.init(conf);
-      log.debug("Loaded memory manager : " + memoryManager.getClass().getName());
-    } catch (Exception e) {
-      log.error("Failed to find memory manger in config, using default", e);
-    }
-    
-    if (memoryManager == null) {
-      memoryManager = new LargestFirstMemoryManager();
-    }
-    
-    memMgmt = new MemoryManagementFramework();
-  }
-  
-  private static class TabletStateImpl implements TabletState, Cloneable {
-    
-    private long lct;
-    private Tablet tablet;
-    private long mts;
-    private long mcmts;
-    
-    public TabletStateImpl(Tablet t, long mts, long lct, long mcmts) {
-      this.tablet = t;
-      this.mts = mts;
-      this.lct = lct;
-      this.mcmts = mcmts;
-    }
-    
-    public KeyExtent getExtent() {
-      return tablet.getExtent();
-    }
-    
-    Tablet getTablet() {
-      return tablet;
-    }
-    
-    public long getLastCommitTime() {
-      return lct;
-    }
-    
-    public long getMemTableSize() {
-      return mts;
-    }
-    
-    public long getMinorCompactingMemTableSize() {
-      return mcmts;
-    }
-  }
-  
-  private class MemoryManagementFramework {
-    private final Map<KeyExtent,TabletStateImpl> tabletReports;
-    private LinkedBlockingQueue<TabletStateImpl> memUsageReports;
-    private long lastMemCheckTime = System.currentTimeMillis();
-    private long maxMem;
-    
-    MemoryManagementFramework() {
-      tabletReports = Collections.synchronizedMap(new HashMap<KeyExtent,TabletStateImpl>());
-      memUsageReports = new LinkedBlockingQueue<TabletStateImpl>();
-      maxMem = conf.getConfiguration().getMemoryInBytes(Property.TSERV_MAXMEM);
-      
-      Runnable r1 = new Runnable() {
-        public void run() {
-          processTabletMemStats();
-        }
-      };
-      
-      Thread t1 = new Daemon(new LoggingRunnable(log, r1));
-      t1.setPriority(Thread.NORM_PRIORITY + 1);
-      t1.setName("Accumulo Memory Guard");
-      t1.start();
-      
-      Runnable r2 = new Runnable() {
-        public void run() {
-          manageMemory();
-        }
-      };
-      
-      Thread t2 = new Daemon(new LoggingRunnable(log, r2));
-      t2.setName("Accumulo Minor Compaction Initiator");
-      t2.start();
-      
-    }
-    
-    private long lastMemTotal = 0;
-    
-    private void processTabletMemStats() {
-      while (true) {
-        try {
-          
-          TabletStateImpl report = memUsageReports.take();
-          
-          while (report != null) {
-            tabletReports.put(report.getExtent(), report);
-            report = memUsageReports.poll();
-          }
-          
-          long delta = System.currentTimeMillis() - lastMemCheckTime;
-          if (holdCommits || delta > 50 || lastMemTotal > 0.90 * maxMem) {
-            lastMemCheckTime = System.currentTimeMillis();
-            
-            long totalMemUsed = 0;
-            
-            synchronized (tabletReports) {
-              for (TabletStateImpl tsi : tabletReports.values()) {
-                totalMemUsed += tsi.getMemTableSize();
-                totalMemUsed += tsi.getMinorCompactingMemTableSize();
-              }
-            }
-            
-            if (totalMemUsed > 0.95 * maxMem) {
-              holdAllCommits(true);
-            } else {
-              holdAllCommits(false);
-            }
-            
-            lastMemTotal = totalMemUsed;
-          }
-          
-        } catch (InterruptedException e) {
-          log.warn(e, e);
-        }
-      }
-    }
-    
-    private void manageMemory() {
-      while (true) {
-        MemoryManagementActions mma = null;
-        
-        try {
-          ArrayList<TabletState> tablets;
-          synchronized (tabletReports) {
-            tablets = new ArrayList<TabletState>(tabletReports.values());
-          }
-          mma = memoryManager.getMemoryManagementActions(tablets);
-          
-        } catch (Throwable t) {
-          log.error("Memory manager failed " + t.getMessage(), t);
-        }
-        
-        try {
-          if (mma != null && mma.tabletsToMinorCompact != null && mma.tabletsToMinorCompact.size() > 0) {
-            for (KeyExtent keyExtent : mma.tabletsToMinorCompact) {
-              TabletStateImpl tabletReport = tabletReports.get(keyExtent);
-              
-              if (tabletReport == null) {
-                log.warn("Memory manager asked to compact nonexistant tablet " + keyExtent);
-                continue;
-              }
-              
-              if (!tabletReport.getTablet().initiateMinorCompaction(MinorCompactionReason.SYSTEM)) {
-                if (tabletReport.getTablet().isClosed()) {
-                  tabletReports.remove(tabletReport.getExtent());
-                  log.debug("Ignoring memory manager recommendation: not minor compacting closed tablet " + keyExtent);
-                } else {
-                  log.info("Ignoring memory manager recommendation: not minor compacting " + keyExtent);
-                }
-              }
-            }
-            
-            // log.debug("mma.tabletsToMinorCompact = "+mma.tabletsToMinorCompact);
-          }
-        } catch (Throwable t) {
-          log.error("Minor compactions for memory managment failed", t);
-        }
-        
-        UtilWaitThread.sleep(250);
-      }
-    }
-    
-    public void updateMemoryUsageStats(Tablet tablet, long size, long lastCommitTime, long mincSize) {
-      memUsageReports.add(new TabletStateImpl(tablet, size, lastCommitTime, mincSize));
-    }
-    
-    public void tabletClosed(KeyExtent extent) {
-      tabletReports.remove(extent);
-    }
-  }
-  
-  private final Object commitHold = new Object();
-  private volatile boolean holdCommits = false;
-  private long holdStartTime;
-  
-  protected void holdAllCommits(boolean holdAllCommits) {
-    synchronized (commitHold) {
-      if (holdCommits != holdAllCommits) {
-        holdCommits = holdAllCommits;
-        
-        if (holdCommits) {
-          holdStartTime = System.currentTimeMillis();
-        }
-        
-        if (!holdCommits) {
-          log.debug(String.format("Commits held for %6.2f secs", (System.currentTimeMillis() - holdStartTime) / 1000.0));
-          commitHold.notifyAll();
-        }
-      }
-    }
-    
-  }
-  
-  void waitUntilCommitsAreEnabled() {
-    if (holdCommits) {
-      long timeout = System.currentTimeMillis() + conf.getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT);
-      synchronized (commitHold) {
-        while (holdCommits) {
-          try {
-            if (System.currentTimeMillis() > timeout)
-              throw new HoldTimeoutException("Commits are held");
-            commitHold.wait(1000);
-          } catch (InterruptedException e) {}
-        }
-      }
-    }
-  }
-  
-  public long holdTime() {
-    if (!holdCommits)
-      return 0;
-    synchronized (commitHold) {
-      return System.currentTimeMillis() - holdStartTime;
-    }
-  }
-  
-  public void close() {
-    for (ExecutorService executorService : threadPools.values()) {
-      executorService.shutdown();
-    }
-    
-    for (Entry<String,ExecutorService> entry : threadPools.entrySet()) {
-      while (true) {
-        try {
-          if (entry.getValue().awaitTermination(60, TimeUnit.SECONDS))
-            break;
-          log.info("Waiting for thread pool " + entry.getKey() + " to shutdown");
-        } catch (InterruptedException e) {
-          log.warn(e);
-        }
-      }
-    }
-  }
-  
-  public synchronized TabletResourceManager createTabletResourceManager() {
-    TabletResourceManager trm = new TabletResourceManager();
-    return trm;
-  }
-  
-  synchronized private void addTabletResource(TabletResourceManager tr) {
-    tabletResources.add(tr);
-  }
-  
-  synchronized private void removeTabletResource(TabletResourceManager tr) {
-    tabletResources.remove(tr);
-  }
-  
-  private class MapFileInfo {
-    private final FileRef path;
-    private final long size;
-    
-    MapFileInfo(FileRef path, long size) {
-      this.path = path;
-      this.size = size;
-    }
-  }
-  
-  public class TabletResourceManager {
-    
-    private final long creationTime = System.currentTimeMillis();
-    
-    private volatile boolean openFilesReserved = false;
-    
-    private volatile boolean closed = false;
-    
-    private Tablet tablet;
-    
-    private AccumuloConfiguration tableConf;
-    
-    TabletResourceManager() {}
-    
-    void setTablet(Tablet tablet, AccumuloConfiguration tableConf) {
-      this.tablet = tablet;
-      this.tableConf = tableConf;
-      // TabletResourceManager is not really initialized until this
-      // function is called.... so do not make it publicly available
-      // until now
-      
-      addTabletResource(this);
-    }
-    
-    // BEGIN methods that Tablets call to manage their set of open map files
-    
-    public void importedMapFiles() {
-      lastReportedCommitTime = System.currentTimeMillis();
-    }
-    
-    synchronized ScanFileManager newScanFileManager() {
-      if (closed)
-        throw new IllegalStateException("closed");
-      return fileManager.newScanFileManager(tablet.getExtent());
-    }
-    
-    // END methods that Tablets call to manage their set of open map files
-    
-    // BEGIN methods that Tablets call to manage memory
-    
-    private AtomicLong lastReportedSize = new AtomicLong();
-    private AtomicLong lastReportedMincSize = new AtomicLong();
-    private volatile long lastReportedCommitTime = 0;
-    
-    public void updateMemoryUsageStats(long size, long mincSize) {
-      
-      // do not want to update stats for every little change,
-      // so only do it under certain circumstances... the reason
-      // for this is that reporting stats acquires a lock, do
-      // not want all tablets locking on the same lock for every
-      // commit
-      long totalSize = size + mincSize;
-      long lrs = lastReportedSize.get();
-      long delta = totalSize - lrs;
-      long lrms = lastReportedMincSize.get();
-      boolean report = false;
-      // the atomic longs are considered independently, when one is set
-      // the other is not set intentionally because this method is not
-      // synchronized... therefore there are not transactional semantics
-      // for reading and writing two variables
-      if ((lrms > 0 && mincSize == 0 || lrms == 0 && mincSize > 0) && lastReportedMincSize.compareAndSet(lrms, mincSize)) {
-        report = true;
-      }
-      
-      long currentTime = System.currentTimeMillis();
-      if ((delta > 32000 || delta < 0 || (currentTime - lastReportedCommitTime > 1000)) && lastReportedSize.compareAndSet(lrs, totalSize)) {
-        if (delta > 0)
-          lastReportedCommitTime = currentTime;
-        report = true;
-      }
-      
-      if (report)
-        memMgmt.updateMemoryUsageStats(tablet, size, lastReportedCommitTime, mincSize);
-    }
-    
-    // END methods that Tablets call to manage memory
-    
-    // BEGIN methods that Tablets call to make decisions about major compaction
-    // when too many files are open, we may want tablets to compact down
-    // to one map file
-    Map<FileRef,Long> findMapFilesToCompact(SortedMap<FileRef,DataFileValue> tabletFiles, MajorCompactionReason reason) {
-      if (reason == MajorCompactionReason.USER) {
-        Map<FileRef,Long> files = new HashMap<FileRef,Long>();
-        for (Entry<FileRef,DataFileValue> entry : tabletFiles.entrySet()) {
-          files.put(entry.getKey(), entry.getValue().getSize());
-        }
-        return files;
-      }
-      
-      if (tabletFiles.size() <= 1)
-        return null;
-      TreeSet<MapFileInfo> candidateFiles = new TreeSet<MapFileInfo>(new Comparator<MapFileInfo>() {
-        @Override
-        public int compare(MapFileInfo o1, MapFileInfo o2) {
-          if (o1 == o2)
-            return 0;
-          if (o1.size < o2.size)
-            return -1;
-          if (o1.size > o2.size)
-            return 1;
-          return o1.path.compareTo(o2.path);
-        }
-      });
-      
-      double ratio = tableConf.getFraction(Property.TABLE_MAJC_RATIO);
-      int maxFilesToCompact = tableConf.getCount(Property.TSERV_MAJC_THREAD_MAXOPEN);
-      int maxFilesPerTablet = tableConf.getMaxFilesPerTablet();
-      
-      for (Entry<FileRef,DataFileValue> entry : tabletFiles.entrySet()) {
-        candidateFiles.add(new MapFileInfo(entry.getKey(), entry.getValue().getSize()));
-      }
-      
-      long totalSize = 0;
-      for (MapFileInfo mfi : candidateFiles) {
-        totalSize += mfi.size;
-      }
-      
-      Map<FileRef,Long> files = new HashMap<FileRef,Long>();
-      
-      while (candidateFiles.size() > 1) {
-        MapFileInfo max = candidateFiles.last();
-        if (max.size * ratio <= totalSize) {
-          files.clear();
-          for (MapFileInfo mfi : candidateFiles) {
-            files.put(mfi.path, mfi.size);
-            if (files.size() >= maxFilesToCompact)
-              break;
-          }
-          
-          break;
-        }
-        totalSize -= max.size;
-        candidateFiles.remove(max);
-      }
-      
-      int totalFilesToCompact = 0;
-      if (tabletFiles.size() > maxFilesPerTablet)
-        totalFilesToCompact = tabletFiles.size() - maxFilesPerTablet + 1;
-      
-      totalFilesToCompact = Math.min(totalFilesToCompact, maxFilesToCompact);
-      
-      if (files.size() < totalFilesToCompact) {
-        
-        TreeMap<FileRef,DataFileValue> tfc = new TreeMap<FileRef,DataFileValue>(tabletFiles);
-        tfc.keySet().removeAll(files.keySet());
-        
-        // put data in candidateFiles to sort it
-        candidateFiles.clear();
-        for (Entry<FileRef,DataFileValue> entry : tfc.entrySet())
-          candidateFiles.add(new MapFileInfo(entry.getKey(), entry.getValue().getSize()));
-        
-        for (MapFileInfo mfi : candidateFiles) {
-          files.put(mfi.path, mfi.size);
-          if (files.size() >= totalFilesToCompact)
-            break;
-        }
-      }
-      
-      if (files.size() == 0)
-        return null;
-      
-      return files;
-    }
-    
-    boolean needsMajorCompaction(SortedMap<FileRef,DataFileValue> tabletFiles, MajorCompactionReason reason) {
-      if (closed)
-        return false;// throw new IOException("closed");
-        
-      // int threshold;
-      
-      if (reason == MajorCompactionReason.USER)
-        return true;
-      
-      if (reason == MajorCompactionReason.IDLE) {
-        // threshold = 1;
-        long idleTime;
-        if (lastReportedCommitTime == 0) {
-          // no commits, so compute how long the tablet has been assigned to the
-          // tablet server
-          idleTime = System.currentTimeMillis() - creationTime;
-        } else {
-          idleTime = System.currentTimeMillis() - lastReportedCommitTime;
-        }
-        
-        if (idleTime < tableConf.getTimeInMillis(Property.TABLE_MAJC_COMPACTALL_IDLETIME)) {
-          return false;
-        }
-      }/*
-        * else{ threshold = tableConf.getCount(Property.TABLE_MAJC_THRESHOLD); }
-        */
-      
-      return findMapFilesToCompact(tabletFiles, reason) != null;
-    }
-    
-    // END methods that Tablets call to make decisions about major compaction
-    
-    // tablets call this method to run minor compactions,
-    // this allows us to control how many minor compactions
-    // run concurrently in a tablet server
-    void executeMinorCompaction(final Runnable r) {
-      minorCompactionThreadPool.execute(new LoggingRunnable(log, r));
-    }
-    
-    void close() throws IOException {
-      // always obtain locks in same order to avoid deadlock
-      synchronized (TabletServerResourceManager.this) {
-        synchronized (this) {
-          if (closed)
-            throw new IOException("closed");
-          if (openFilesReserved)
-            throw new IOException("tired to close files while open files reserved");
-          
-          TabletServerResourceManager.this.removeTabletResource(this);
-          
-          memMgmt.tabletClosed(tablet.getExtent());
-          memoryManager.tabletClosed(tablet.getExtent());
-          
-          closed = true;
-        }
-      }
-    }
-    
-    public TabletServerResourceManager getTabletServerResourceManager() {
-      return TabletServerResourceManager.this;
-    }
-    
-    public void executeMajorCompaction(KeyExtent tablet, Runnable compactionTask) {
-      TabletServerResourceManager.this.executeMajorCompaction(tablet, compactionTask);
-    }
-    
-  }
-  
-  public void executeSplit(KeyExtent tablet, Runnable splitTask) {
-    if (tablet.isMeta()) {
-      if (tablet.isRootTablet()) {
-        log.warn("Saw request to split root tablet, ignoring");
-        return;
-      }
-      defaultSplitThreadPool.execute(splitTask);
-    } else {
-      splitThreadPool.execute(splitTask);
-    }
-  }
-  
-  public void executeMajorCompaction(KeyExtent tablet, Runnable compactionTask) {
-    if (tablet.equals(RootTable.EXTENT)) {
-      rootMajorCompactionThreadPool.execute(compactionTask);
-    } else if (tablet.isMeta()) {
-      defaultMajorCompactionThreadPool.execute(compactionTask);
-    } else {
-      majorCompactionThreadPool.execute(compactionTask);
-    }
-  }
-  
-  public void executeReadAhead(KeyExtent tablet, Runnable task) {
-    if (tablet.isRootTablet()) {
-      task.run();
-    } else if (tablet.isMeta()) {
-      defaultReadAheadThreadPool.execute(task);
-    } else {
-      readAheadThreadPool.execute(task);
-    }
-  }
-  
-  public void addAssignment(Runnable assignmentHandler) {
-    assignmentPool.execute(assignmentHandler);
-  }
-  
-  public void addMetaDataAssignment(Runnable assignmentHandler) {
-    assignMetaDataPool.execute(assignmentHandler);
-  }
-  
-  public void addMigration(KeyExtent tablet, Runnable migrationHandler) {
-    if (tablet.isRootTablet()) {
-      migrationHandler.run();
-    } else if (tablet.isMeta()) {
-      defaultMigrationPool.execute(migrationHandler);
-    } else {
-      migrationPool.execute(migrationHandler);
-    }
-  }
-  
-  public void stopSplits() {
-    splitThreadPool.shutdown();
-    defaultSplitThreadPool.shutdown();
-    while (true) {
-      try {
-        while (!splitThreadPool.awaitTermination(1, TimeUnit.MINUTES)) {
-          log.info("Waiting for metadata split thread pool to stop");
-        }
-        while (!defaultSplitThreadPool.awaitTermination(1, TimeUnit.MINUTES)) {
-          log.info("Waiting for split thread pool to stop");
-        }
-        break;
-      } catch (InterruptedException ex) {
-        log.info(ex, ex);
-      }
-    }
-  }
-  
-  public void stopNormalAssignments() {
-    assignmentPool.shutdown();
-    while (true) {
-      try {
-        while (!assignmentPool.awaitTermination(1, TimeUnit.MINUTES)) {
-          log.info("Waiting for assignment thread pool to stop");
-        }
-        break;
-      } catch (InterruptedException ex) {
-        log.info(ex, ex);
-      }
-    }
-  }
-  
-  public void stopMetadataAssignments() {
-    assignMetaDataPool.shutdown();
-    while (true) {
-      try {
-        while (!assignMetaDataPool.awaitTermination(1, TimeUnit.MINUTES)) {
-          log.info("Waiting for metadata assignment thread pool to stop");
-        }
-        break;
-      } catch (InterruptedException ex) {
-        log.info(ex, ex);
-      }
-    }
-  }
-  
-  public LruBlockCache getIndexCache() {
-    return _iCache;
-  }
-  
-  public LruBlockCache getDataCache() {
-    return _dCache;
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c3ddf9b6/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/TabletState.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/TabletState.java b/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/TabletState.java
deleted file mode 100644
index aeacb8d..0000000
--- a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/TabletState.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver;
-
-import org.apache.accumulo.core.data.KeyExtent;
-
-public interface TabletState {
-  KeyExtent getExtent();
-  
-  long getLastCommitTime();
-  
-  long getMemTableSize();
-  
-  long getMinorCompactingMemTableSize();
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c3ddf9b6/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/TabletStatsKeeper.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/TabletStatsKeeper.java b/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/TabletStatsKeeper.java
deleted file mode 100644
index 89ea6ac..0000000
--- a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/TabletStatsKeeper.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver;
-
-import org.apache.accumulo.core.tabletserver.thrift.ActionStats;
-import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
-import org.apache.accumulo.server.util.ActionStatsUpdator;
-
-public class TabletStatsKeeper {
-  
-  private ActionStats major = new ActionStats();
-  private ActionStats minor = new ActionStats();
-  private ActionStats split = new ActionStats();
-  
-  public enum Operation {
-    MAJOR, SPLIT, MINOR
-  }
-  
-  private ActionStats[] map = new ActionStats[] {major, split, minor};
-  
-  public void updateTime(Operation operation, long queued, long start, long count, boolean failed) {
-    try {
-      ActionStats data = map[operation.ordinal()];
-      if (failed) {
-        data.fail++;
-        data.status--;
-      } else {
-        double t = (System.currentTimeMillis() - start) / 1000.0;
-        double q = (start - queued) / 1000.0;
-        
-        data.status--;
-        data.count += count;
-        data.num++;
-        data.elapsed += t;
-        data.queueTime += q;
-        data.sumDev += t * t;
-        data.queueSumDev += q * q;
-        if (data.elapsed < 0 || data.sumDev < 0 || data.queueSumDev < 0 || data.queueTime < 0)
-          resetTimes();
-      }
-    } catch (Exception E) {
-      resetTimes();
-    }
-    
-  }
-  
-  public void updateTime(Operation operation, long start, long count, boolean failed) {
-    try {
-      ActionStats data = map[operation.ordinal()];
-      if (failed) {
-        data.fail++;
-        data.status--;
-      } else {
-        double t = (System.currentTimeMillis() - start) / 1000.0;
-        
-        data.status--;
-        data.num++;
-        data.elapsed += t;
-        data.sumDev += t * t;
-        
-        if (data.elapsed < 0 || data.sumDev < 0 || data.queueSumDev < 0 || data.queueTime < 0)
-          resetTimes();
-      }
-    } catch (Exception E) {
-      resetTimes();
-    }
-    
-  }
-  
-  public void saveMinorTimes(TabletStatsKeeper t) {
-    ActionStatsUpdator.update(minor, t.minor);
-  }
-  
-  public void saveMajorTimes(TabletStatsKeeper t) {
-    ActionStatsUpdator.update(major, t.major);
-  }
-  
-  public void resetTimes() {
-    major = new ActionStats();
-    split = new ActionStats();
-    minor = new ActionStats();
-  }
-  
-  public void incrementStatusMinor() {
-    minor.status++;
-  }
-  
-  public void incrementStatusMajor() {
-    major.status++;
-  }
-  
-  public void incrementStatusSplit() {
-    split.status++;
-  }
-  
-  public TabletStats getTabletStats() {
-    return new TabletStats(null, major, minor, split, 0, 0, 0, 0);
-  }
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c3ddf9b6/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/TooManyFilesException.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/TooManyFilesException.java b/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/TooManyFilesException.java
deleted file mode 100644
index b6d63ad..0000000
--- a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/TooManyFilesException.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver;
-
-import java.io.IOException;
-
-public class TooManyFilesException extends IOException {
-  
-  private static final long serialVersionUID = 1L;
-  
-  public TooManyFilesException(String msg) {
-    super(msg);
-  }
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c3ddf9b6/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java
deleted file mode 100644
index 113e256..0000000
--- a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java
+++ /dev/null
@@ -1,455 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver.log;
-
-import static org.apache.accumulo.tserver.logger.LogEvents.COMPACTION_FINISH;
-import static org.apache.accumulo.tserver.logger.LogEvents.COMPACTION_START;
-import static org.apache.accumulo.tserver.logger.LogEvents.DEFINE_TABLET;
-import static org.apache.accumulo.tserver.logger.LogEvents.MANY_MUTATIONS;
-import static org.apache.accumulo.tserver.logger.LogEvents.OPEN;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.KeyExtent;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.security.crypto.CryptoModuleFactory;
-import org.apache.accumulo.core.security.crypto.CryptoModuleParameters;
-import org.apache.accumulo.core.util.Daemon;
-import org.apache.accumulo.core.util.StringUtil;
-import org.apache.accumulo.master.state.TServerInstance;
-import org.apache.accumulo.server.ServerConstants;
-import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.accumulo.server.tabletserver.TabletMutations;
-import org.apache.accumulo.tserver.logger.LogFileKey;
-import org.apache.accumulo.tserver.logger.LogFileValue;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.Path;
-import org.apache.log4j.Logger;
-
-/**
- * Wrap a connection to a logger.
- * 
- */
-public class DfsLogger {
-  // Package private so that LogSorter can find this
-  static final String LOG_FILE_HEADER_V2 = "--- Log File Header (v2) ---";
-  
-  private static Logger log = Logger.getLogger(DfsLogger.class);
-  
-  public static class LogClosedException extends IOException {
-    private static final long serialVersionUID = 1L;
-    
-    public LogClosedException() {
-      super("LogClosed");
-    }
-  }
-  
-  public interface ServerResources {
-    AccumuloConfiguration getConfiguration();
-    
-    VolumeManager getFileSystem();
-    
-    Set<TServerInstance> getCurrentTServers();
-  }
-  
-  private final LinkedBlockingQueue<DfsLogger.LogWork> workQueue = new LinkedBlockingQueue<DfsLogger.LogWork>();
-  
-  private final Object closeLock = new Object();
-  
-  private static final DfsLogger.LogWork CLOSED_MARKER = new DfsLogger.LogWork(null, null);
-  
-  private static final LogFileValue EMPTY = new LogFileValue();
-  
-  private boolean closed = false;
-  
-  private class LogSyncingTask implements Runnable {
-    
-    @Override
-    public void run() {
-      ArrayList<DfsLogger.LogWork> work = new ArrayList<DfsLogger.LogWork>();
-      while (true) {
-        work.clear();
-        
-        try {
-          work.add(workQueue.take());
-        } catch (InterruptedException ex) {
-          continue;
-        }
-        workQueue.drainTo(work);
-        
-        synchronized (closeLock) {
-          if (!closed) {
-            try {
-              sync.invoke(logFile);
-            } catch (Exception ex) {
-              log.warn("Exception syncing " + ex);
-              for (DfsLogger.LogWork logWork : work) {
-                logWork.exception = ex;
-              }
-            }
-          } else {
-            for (DfsLogger.LogWork logWork : work) {
-              logWork.exception = new LogClosedException();
-            }
-          }
-        }
-        
-        boolean sawClosedMarker = false;
-        for (DfsLogger.LogWork logWork : work)
-          if (logWork == CLOSED_MARKER)
-            sawClosedMarker = true;
-          else
-            logWork.latch.countDown();
-        
-        if (sawClosedMarker) {
-          synchronized (closeLock) {
-            closeLock.notifyAll();
-          }
-          break;
-        }
-      }
-    }
-  }
-  
-  static class LogWork {
-    List<TabletMutations> mutations;
-    CountDownLatch latch;
-    volatile Exception exception;
-    
-    public LogWork(List<TabletMutations> mutations, CountDownLatch latch) {
-      this.mutations = mutations;
-      this.latch = latch;
-    }
-  }
-  
-  public static class LoggerOperation {
-    private final LogWork work;
-    
-    public LoggerOperation(LogWork work) {
-      this.work = work;
-    }
-    
-    public void await() throws IOException {
-      try {
-        work.latch.await();
-      } catch (InterruptedException e) {
-        throw new RuntimeException(e);
-      }
-      
-      if (work.exception != null) {
-        if (work.exception instanceof IOException)
-          throw (IOException) work.exception;
-        else if (work.exception instanceof RuntimeException)
-          throw (RuntimeException) work.exception;
-        else
-          throw new RuntimeException(work.exception);
-      }
-    }
-  }
-  
-  @Override
-  public boolean equals(Object obj) {
-    // filename is unique
-    if (obj == null)
-      return false;
-    if (obj instanceof DfsLogger)
-      return getFileName().equals(((DfsLogger) obj).getFileName());
-    return false;
-  }
-  
-  @Override
-  public int hashCode() {
-    // filename is unique
-    return getFileName().hashCode();
-  }
-  
-  private final ServerResources conf;
-  private FSDataOutputStream logFile;
-  private DataOutputStream encryptingLogFile = null;
-  private Method sync;
-  private Path logPath;
-  private String logger;
-  
-  public DfsLogger(ServerResources conf) throws IOException {
-    this.conf = conf;
-  }
-  
-  public DfsLogger(ServerResources conf, String logger, Path filename) throws IOException {
-    this.conf = conf;
-    this.logger = logger;
-    this.logPath = filename;
-  }
-  
-  public static FSDataInputStream readHeader(VolumeManager fs, Path path, Map<String,String> opts) throws IOException {
-    FSDataInputStream file = fs.open(path);
-    try {
-      byte[] magic = LOG_FILE_HEADER_V2.getBytes();
-      byte[] buffer = new byte[magic.length];
-      file.readFully(buffer);
-      if (Arrays.equals(buffer, magic)) {
-        int count = file.readInt();
-        for (int i = 0; i < count; i++) {
-          String key = file.readUTF();
-          String value = file.readUTF();
-          opts.put(key, value);
-        }
-      } else {
-        file.seek(0);
-        return file;
-      }
-      return file;
-    } catch (IOException ex) {
-      file.seek(0);
-      return file;
-    }
-  }
-  
-  public synchronized void open(String address) throws IOException {
-    String filename = UUID.randomUUID().toString();
-    logger = StringUtil.join(Arrays.asList(address.split(":")), "+");
-    
-    log.debug("DfsLogger.open() begin");
-    VolumeManager fs = conf.getFileSystem();
-    
-    logPath = new Path(fs.choose(ServerConstants.getWalDirs()) + "/" + logger + "/" + filename);
-    try {
-      short replication = (short) conf.getConfiguration().getCount(Property.TSERV_WAL_REPLICATION);
-      if (replication == 0)
-        replication = fs.getDefaultReplication(logPath);
-      long blockSize = conf.getConfiguration().getMemoryInBytes(Property.TSERV_WAL_BLOCKSIZE);
-      if (blockSize == 0)
-        blockSize = (long) (conf.getConfiguration().getMemoryInBytes(Property.TSERV_WALOG_MAX_SIZE) * 1.1);
-      if (conf.getConfiguration().getBoolean(Property.TSERV_WAL_SYNC))
-        logFile = fs.createSyncable(logPath, 0, replication, blockSize);
-      else
-        logFile = fs.create(logPath, true, 0, replication, blockSize);
-      
-      try {
-        NoSuchMethodException e = null;
-        try {
-          // sync: send data to datanodes
-          sync = logFile.getClass().getMethod("sync");
-        } catch (NoSuchMethodException ex) {
-          e = ex;
-        }
-        try {
-          // hsync: send data to datanodes and sync the data to disk
-          sync = logFile.getClass().getMethod("hsync");
-          e = null;
-        } catch (NoSuchMethodException ex) {}
-        if (e != null)
-          throw new RuntimeException(e);
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-      
-      // Initialize the crypto operations.
-      org.apache.accumulo.core.security.crypto.CryptoModule cryptoModule = org.apache.accumulo.core.security.crypto.CryptoModuleFactory.getCryptoModule(conf
-          .getConfiguration().get(Property.CRYPTO_MODULE_CLASS));
-      
-      // Initialize the log file with a header and the crypto params used to set up this log file.
-      logFile.write(LOG_FILE_HEADER_V2.getBytes());
-
-      CryptoModuleParameters params = CryptoModuleFactory.createParamsObjectFromAccumuloConfiguration(conf.getConfiguration());
-      
-      params.setPlaintextOutputStream(logFile);
-      
-      // In order to bootstrap the reading of this file later, we have to record the CryptoModule that was used to encipher it here,
-      // so that that crypto module can re-read its own parameters.
-      
-      logFile.writeUTF(conf.getConfiguration().get(Property.CRYPTO_MODULE_CLASS));
-      
-      
-      //@SuppressWarnings("deprecation")
-      //OutputStream encipheringOutputStream = cryptoModule.getEncryptingOutputStream(logFile, cryptoOpts);
-      params = cryptoModule.getEncryptingOutputStream(params);
-      OutputStream encipheringOutputStream = params.getEncryptedOutputStream();
-      
-      // If the module just kicks back our original stream, then just use it, don't wrap it in
-      // another data OutputStream.
-      if (encipheringOutputStream == logFile) {
-        encryptingLogFile = logFile;
-      } else {
-        encryptingLogFile = new DataOutputStream(encipheringOutputStream);
-      }
-      
-      LogFileKey key = new LogFileKey();
-      key.event = OPEN;
-      key.tserverSession = filename;
-      key.filename = filename;
-      write(key, EMPTY);
-      sync.invoke(logFile);
-      log.debug("Got new write-ahead log: " + this);
-    } catch (Exception ex) {
-      if (logFile != null)
-        logFile.close();
-      logFile = null;
-      throw new IOException(ex);
-    }
-    
-    Thread t = new Daemon(new LogSyncingTask());
-    t.setName("Accumulo WALog thread " + toString());
-    t.start();
-  }
-  
-  @Override
-  public String toString() {
-    return getLogger() + "/" + getFileName();
-  }
-  
-  public String getLogger() {
-    return logger;
-  }
-  
-  public String getFileName() {
-    return logPath.toString();
-  }
-  
-  public void close() throws IOException {
-    
-    synchronized (closeLock) {
-      if (closed)
-        return;
-      // after closed is set to true, nothing else should be added to the queue
-      // CLOSED_MARKER should be the last thing on the queue, therefore when the
-      // background thread sees the marker and exits there should be nothing else
-      // to process... so nothing should be left waiting for the background
-      // thread to do work
-      closed = true;
-      workQueue.add(CLOSED_MARKER);
-      while (!workQueue.isEmpty())
-        try {
-          closeLock.wait();
-        } catch (InterruptedException e) {
-          log.info("Interrupted");
-        }
-    }
-    
-    if (logFile != null)
-      try {
-        logFile.close();
-      } catch (IOException ex) {
-        log.error(ex);
-        throw new LogClosedException();
-      }
-  }
-  
-  public synchronized void defineTablet(int seq, int tid, KeyExtent tablet) throws IOException {
-    // write this log to the METADATA table
-    final LogFileKey key = new LogFileKey();
-    key.event = DEFINE_TABLET;
-    key.seq = seq;
-    key.tid = tid;
-    key.tablet = tablet;
-    try {
-      write(key, EMPTY);
-      sync.invoke(logFile);
-    } catch (Exception ex) {
-      log.error(ex);
-      throw new IOException(ex);
-    }
-  }
-  
-  /**
-   * @param key
-   * @param empty2
-   * @throws IOException
-   */
-  private synchronized void write(LogFileKey key, LogFileValue value) throws IOException {
-    key.write(encryptingLogFile);
-    value.write(encryptingLogFile);
-  }
-  
-  public LoggerOperation log(int seq, int tid, Mutation mutation) throws IOException {
-    return logManyTablets(Collections.singletonList(new TabletMutations(tid, seq, Collections.singletonList(mutation))));
-  }
-  
-  public LoggerOperation logManyTablets(List<TabletMutations> mutations) throws IOException {
-    DfsLogger.LogWork work = new DfsLogger.LogWork(mutations, new CountDownLatch(1));
-    
-    synchronized (DfsLogger.this) {
-      try {
-        for (TabletMutations tabletMutations : mutations) {
-          LogFileKey key = new LogFileKey();
-          key.event = MANY_MUTATIONS;
-          key.seq = tabletMutations.getSeq();
-          key.tid = tabletMutations.getTid();
-          LogFileValue value = new LogFileValue();
-          value.mutations = tabletMutations.getMutations();
-          write(key, value);
-        }
-      } catch (Exception e) {
-        log.error(e, e);
-        work.exception = e;
-      }
-    }
-    
-    synchronized (closeLock) {
-      // use a different lock for close check so that adding to work queue does not need
-      // to wait on walog I/O operations
-      
-      if (closed)
-        throw new LogClosedException();
-      workQueue.add(work);
-    }
-    
-    return new LoggerOperation(work);
-  }
-  
-  public synchronized void minorCompactionFinished(int seq, int tid, String fqfn) throws IOException {
-    LogFileKey key = new LogFileKey();
-    key.event = COMPACTION_FINISH;
-    key.seq = seq;
-    key.tid = tid;
-    try {
-      write(key, EMPTY);
-    } catch (IOException ex) {
-      log.error(ex);
-      throw ex;
-    }
-  }
-  
-  public synchronized void minorCompactionStarted(int seq, int tid, String fqfn) throws IOException {
-    LogFileKey key = new LogFileKey();
-    key.event = COMPACTION_START;
-    key.seq = seq;
-    key.tid = tid;
-    key.filename = fqfn;
-    try {
-      write(key, EMPTY);
-    } catch (IOException ex) {
-      log.error(ex);
-      throw ex;
-    }
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c3ddf9b6/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java b/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java
deleted file mode 100644
index 0a2ba12..0000000
--- a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java
+++ /dev/null
@@ -1,272 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver.log;
-
-import java.io.DataInputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ThreadPoolExecutor;
-
-import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.master.thrift.RecoveryStatus;
-import org.apache.accumulo.core.security.crypto.CryptoModuleFactory;
-import org.apache.accumulo.core.security.crypto.CryptoModuleParameters;
-import org.apache.accumulo.core.util.Pair;
-import org.apache.accumulo.core.util.SimpleThreadPool;
-import org.apache.accumulo.core.zookeeper.ZooUtil;
-import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
-import org.apache.accumulo.server.zookeeper.DistributedWorkQueue.Processor;
-import org.apache.accumulo.tserver.logger.LogFileKey;
-import org.apache.accumulo.tserver.logger.LogFileValue;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.MapFile;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.KeeperException;
-
-/**
- * 
- */
-public class LogSorter {
-  
-  private static final Logger log = Logger.getLogger(LogSorter.class);
-  VolumeManager fs;
-  AccumuloConfiguration conf;
-  
-  private final Map<String,LogProcessor> currentWork = Collections.synchronizedMap(new HashMap<String,LogProcessor>());
-  
-  class LogProcessor implements Processor {
-    
-    private FSDataInputStream input;
-    private DataInputStream decryptingInput;
-    private long bytesCopied = -1;
-    private long sortStart = 0;
-    private long sortStop = -1;
-    
-    @Override
-    public Processor newProcessor() {
-      return new LogProcessor();
-    }
-    
-    @Override
-    public void process(String child, byte[] data) {
-      String work = new String(data);
-      String[] parts = work.split("\\|");
-      String src = parts[0];
-      String dest = parts[1];
-      String sortId = new Path(src).getName();
-      log.debug("Sorting " + src + " to " + dest + " using sortId " + sortId);
-      
-      synchronized (currentWork) {
-        if (currentWork.containsKey(sortId))
-          return;
-        currentWork.put(sortId, this);
-      }
-      
-      try {
-        log.info("Copying " + src + " to " + dest);
-        sort(sortId, new Path(src), dest);
-      } finally {
-        currentWork.remove(sortId);
-      }
-      
-    }
-    
-    public void sort(String name, Path srcPath, String destPath) {
-      
-      synchronized (this) {
-        sortStart = System.currentTimeMillis();
-      }
-      
-      String formerThreadName = Thread.currentThread().getName();
-      int part = 0;
-      try {
-        
-        // the following call does not throw an exception if the file/dir does not exist
-        fs.deleteRecursively(new Path(destPath));
-        
-        FSDataInputStream tmpInput = fs.open(srcPath);
-                
-        byte[] magic = DfsLogger.LOG_FILE_HEADER_V2.getBytes();
-        byte[] magicBuffer = new byte[magic.length];
-        tmpInput.readFully(magicBuffer);
-        if (!Arrays.equals(magicBuffer, magic)) {
-          tmpInput.seek(0);
-          synchronized (this) {
-           this.input = tmpInput;
-           this.decryptingInput = tmpInput;
-          }
-        } else {
-          // We read the crypto module class name here because we need to boot strap the class.  The class itself will read any 
-          // additional parameters it needs from the underlying stream.
-          String cryptoModuleClassname = tmpInput.readUTF();
-          org.apache.accumulo.core.security.crypto.CryptoModule cryptoModule = org.apache.accumulo.core.security.crypto.CryptoModuleFactory
-              .getCryptoModule(cryptoModuleClassname);
-          
-          // Create the parameters and set the input stream into those parameters
-          CryptoModuleParameters params = CryptoModuleFactory.createParamsObjectFromAccumuloConfiguration(conf);
-          params.setEncryptedInputStream(tmpInput);
-          
-          // Create the plaintext input stream from the encrypted one
-          params = cryptoModule.getDecryptingInputStream(params);
-          
-          // Store the plaintext input stream into member variables
-          synchronized (this) {
-            this.input = tmpInput;
-            
-            if (params.getPlaintextInputStream() instanceof DataInputStream) {
-              this.decryptingInput = (DataInputStream)params.getPlaintextInputStream();              
-            } else {
-              this.decryptingInput = new DataInputStream(params.getPlaintextInputStream());
-            }
-            
-          }
-          
-        }
-                
-        
-        final long bufferSize = conf.getMemoryInBytes(Property.TSERV_SORT_BUFFER_SIZE);
-        Thread.currentThread().setName("Sorting " + name + " for recovery");
-        while (true) {
-          final ArrayList<Pair<LogFileKey,LogFileValue>> buffer = new ArrayList<Pair<LogFileKey,LogFileValue>>();
-          try {
-            long start = input.getPos();
-            while (input.getPos() - start < bufferSize) {
-              LogFileKey key = new LogFileKey();
-              LogFileValue value = new LogFileValue();
-              key.readFields(decryptingInput);
-              value.readFields(decryptingInput);
-              buffer.add(new Pair<LogFileKey,LogFileValue>(key, value));
-            }
-            writeBuffer(destPath, buffer, part++);
-            buffer.clear();
-          } catch (EOFException ex) {
-            writeBuffer(destPath, buffer, part++);
-            break;
-          }
-        }
-        fs.create(new Path(destPath, "finished")).close();
-        log.info("Finished log sort " + name + " " + getBytesCopied() + " bytes " + part + " parts in " + getSortTime() + "ms");
-      } catch (Throwable t) {
-        try {
-          // parent dir may not exist
-          fs.mkdirs(new Path(destPath));
-          fs.create(new Path(destPath, "failed")).close();
-        } catch (IOException e) {
-          log.error("Error creating failed flag file " + name, e);
-        }
-        log.error(t, t);
-      } finally {
-        Thread.currentThread().setName(formerThreadName);
-        try {
-          close();
-        } catch (Exception e) {
-          log.error("Error during cleanup sort/copy " + name, e);
-        }
-        synchronized (this) {
-          sortStop = System.currentTimeMillis();
-        }
-      }
-    }
-    
-    private void writeBuffer(String destPath, ArrayList<Pair<LogFileKey,LogFileValue>> buffer, int part) throws IOException {
-      Path path = new Path(destPath, String.format("part-r-%05d", part++));
-      FileSystem ns = fs.getFileSystemByPath(path);
-      MapFile.Writer output = new MapFile.Writer(ns.getConf(), ns, path.toString(), LogFileKey.class, LogFileValue.class);
-      try {
-        Collections.sort(buffer, new Comparator<Pair<LogFileKey,LogFileValue>>() {
-          @Override
-          public int compare(Pair<LogFileKey,LogFileValue> o1, Pair<LogFileKey,LogFileValue> o2) {
-            return o1.getFirst().compareTo(o2.getFirst());
-          }
-        });
-        for (Pair<LogFileKey,LogFileValue> entry : buffer) {
-          output.append(entry.getFirst(), entry.getSecond());
-        }
-      } finally {
-        output.close();
-      }
-    }
-    
-    synchronized void close() throws IOException {
-      bytesCopied = input.getPos();
-      input.close();
-      decryptingInput.close();
-      input = null;
-    }
-    
-    public synchronized long getSortTime() {
-      if (sortStart > 0) {
-        if (sortStop > 0)
-          return sortStop - sortStart;
-        return System.currentTimeMillis() - sortStart;
-      }
-      return 0;
-    }
-    
-    synchronized long getBytesCopied() throws IOException {
-      return input == null ? bytesCopied : input.getPos();
-    }
-  }
-  
-  ThreadPoolExecutor threadPool;
-  private final Instance instance;
-  
-  public LogSorter(Instance instance, VolumeManager fs, AccumuloConfiguration conf) {
-    this.instance = instance;
-    this.fs = fs;
-    this.conf = conf;
-    int threadPoolSize = conf.getCount(Property.TSERV_RECOVERY_MAX_CONCURRENT);
-    this.threadPool = new SimpleThreadPool(threadPoolSize, this.getClass().getName());
-  }
-  
-  public void startWatchingForRecoveryLogs(ThreadPoolExecutor distWorkQThreadPool) throws KeeperException, InterruptedException {
-    this.threadPool = distWorkQThreadPool;
-    new DistributedWorkQueue(ZooUtil.getRoot(instance) + Constants.ZRECOVERY).startProcessing(new LogProcessor(), this.threadPool);
-  }
-  
-  public List<RecoveryStatus> getLogSorts() {
-    List<RecoveryStatus> result = new ArrayList<RecoveryStatus>();
-    synchronized (currentWork) {
-      for (Entry<String,LogProcessor> entries : currentWork.entrySet()) {
-        RecoveryStatus status = new RecoveryStatus();
-        status.name = entries.getKey();
-        try {
-          status.progress = entries.getValue().getBytesCopied() / (0.0 + conf.getMemoryInBytes(Property.TSERV_WALOG_MAX_SIZE));
-        } catch (IOException ex) {
-          log.warn("Error getting bytes read");
-        }
-        status.runtime = (int) entries.getValue().getSortTime();
-        result.add(status);
-      }
-      return result;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c3ddf9b6/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/log/MultiReader.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/log/MultiReader.java b/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/log/MultiReader.java
deleted file mode 100644
index 77ddc66..0000000
--- a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/log/MultiReader.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver.log;
-
-import java.io.EOFException;
-import java.io.IOException;
-
-import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.commons.collections.buffer.PriorityBuffer;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.MapFile.Reader;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * Provide simple Map.Reader methods over multiple Maps.
- * 
- * Presently only supports next() and seek() and works on all the Map directories within a directory. The primary purpose of this class is to merge the results
- * of multiple Reduce jobs that result in Map output files.
- */
-@SuppressWarnings({"rawtypes", "unchecked"})
-public class MultiReader {
-  
-  /**
-   * Group together the next key/value from a Reader with the Reader
-   * 
-   */
-  private static class Index implements Comparable<Index> {
-    Reader reader;
-    WritableComparable key;
-    Writable value;
-    boolean cached = false;
-    
-    private static Object create(java.lang.Class<?> klass) {
-      try {
-        return klass.getConstructor().newInstance();
-      } catch (Throwable t) {
-        throw new RuntimeException("Unable to construct objects to use for comparison");
-      }
-    }
-    
-    public Index(Reader reader) {
-      this.reader = reader;
-      key = (WritableComparable) create(reader.getKeyClass());
-      value = (Writable) create(reader.getValueClass());
-    }
-    
-    private void cache() throws IOException {
-      if (!cached && reader.next(key, value)) {
-        cached = true;
-      }
-    }
-    
-    public int compareTo(Index o) {
-      try {
-        cache();
-        o.cache();
-        // no more data: always goes to the end
-        if (!cached)
-          return 1;
-        if (!o.cached)
-          return -1;
-        return key.compareTo(o.key);
-      } catch (IOException ex) {
-        throw new RuntimeException(ex);
-      }
-    }
-  }
-  
-  private PriorityBuffer heap = new PriorityBuffer();
-  
-  public MultiReader(VolumeManager fs, Path directory) throws IOException {
-    boolean foundFinish = false;
-    for (FileStatus child : fs.listStatus(directory)) {
-      if (child.getPath().getName().startsWith("_"))
-        continue;
-      if (child.getPath().getName().equals("finished")) {
-        foundFinish = true;
-        continue;
-      }
-      FileSystem ns = fs.getFileSystemByPath(child.getPath());
-      heap.add(new Index(new Reader(ns, child.getPath().toString(), ns.getConf())));
-    }
-    if (!foundFinish)
-      throw new IOException("Sort \"finished\" flag not found in " + directory);
-  }
-  
-  private static void copy(Writable src, Writable dest) throws IOException {
-    // not exactly efficient...
-    DataOutputBuffer output = new DataOutputBuffer();
-    src.write(output);
-    DataInputBuffer input = new DataInputBuffer();
-    input.reset(output.getData(), output.getLength());
-    dest.readFields(input);
-  }
-  
-  public synchronized boolean next(WritableComparable key, Writable val) throws IOException {
-    Index elt = (Index) heap.remove();
-    try {
-      elt.cache();
-      if (elt.cached) {
-        copy(elt.key, key);
-        copy(elt.value, val);
-        elt.cached = false;
-      } else {
-        return false;
-      }
-    } finally {
-      heap.add(elt);
-    }
-    return true;
-  }
-  
-  public synchronized boolean seek(WritableComparable key) throws IOException {
-    PriorityBuffer reheap = new PriorityBuffer(heap.size());
-    boolean result = false;
-    for (Object obj : heap) {
-      Index index = (Index) obj;
-      try {
-        WritableComparable found = index.reader.getClosest(key, index.value, true);
-        if (found != null && found.equals(key)) {
-          result = true;
-        }
-      } catch (EOFException ex) {
-        // thrown if key is beyond all data in the map
-      }
-      index.cached = false;
-      reheap.add(index);
-    }
-    heap = reheap;
-    return result;
-  }
-  
-  public void close() throws IOException {
-    IOException problem = null;
-    for (Object obj : heap) {
-      Index index = (Index) obj;
-      try {
-        index.reader.close();
-      } catch (IOException ex) {
-        problem = ex;
-      }
-    }
-    if (problem != null)
-      throw problem;
-    heap = null;
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c3ddf9b6/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/log/MutationReceiver.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/log/MutationReceiver.java b/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/log/MutationReceiver.java
deleted file mode 100644
index c455397..0000000
--- a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/log/MutationReceiver.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver.log;
-
-import org.apache.accumulo.core.data.Mutation;
-
-public interface MutationReceiver {
-  void receive(Mutation m);
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c3ddf9b6/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/log/SortedLogRecovery.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/log/SortedLogRecovery.java b/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/log/SortedLogRecovery.java
deleted file mode 100644
index 789fe31..0000000
--- a/server/tserver/src/main/java/org/apache/accumulo/server/tabletserver/log/SortedLogRecovery.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.tabletserver.log;
-
-import static org.apache.accumulo.tserver.logger.LogEvents.COMPACTION_FINISH;
-import static org.apache.accumulo.tserver.logger.LogEvents.COMPACTION_START;
-import static org.apache.accumulo.tserver.logger.LogEvents.DEFINE_TABLET;
-import static org.apache.accumulo.tserver.logger.LogEvents.MANY_MUTATIONS;
-import static org.apache.accumulo.tserver.logger.LogEvents.MUTATION;
-import static org.apache.accumulo.tserver.logger.LogEvents.OPEN;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.accumulo.core.data.KeyExtent;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.accumulo.tserver.logger.LogFileKey;
-import org.apache.accumulo.tserver.logger.LogFileValue;
-import org.apache.hadoop.fs.Path;
-import org.apache.log4j.Logger;
-
-/**
- * Extract Mutations for a tablet from a set of logs that have been sorted by operation and tablet.
- * 
- */
-public class SortedLogRecovery {
-  private static final Logger log = Logger.getLogger(SortedLogRecovery.class);
-  
-  static class EmptyMapFileException extends Exception {
-    private static final long serialVersionUID = 1L;
-
-    public EmptyMapFileException() { super(); }
-  }
-
-  static class UnusedException extends Exception {
-    private static final long serialVersionUID = 1L;
-
-    public UnusedException() { super(); }
-  }
-
-  private VolumeManager fs;
-
-  public SortedLogRecovery(VolumeManager fs) {
-    this.fs = fs;
-  }
-  
-  private enum Status {
-    INITIAL, LOOKING_FOR_FINISH, COMPLETE
-  };
-  
-  private static class LastStartToFinish {
-    long lastStart = -1;
-    long seq = -1;
-    long lastFinish = -1;
-    Status compactionStatus = Status.INITIAL;
-    String tserverSession = "";
-    
-    private void update(long newFinish) {
-      this.seq = this.lastStart;
-      if (newFinish != -1)
-        lastFinish = newFinish;
-    }
-    
-    private void update(int newStartFile, long newStart) {
-      this.lastStart = newStart;
-    }
-    
-    private void update(String newSession) {
-      this.lastStart = -1;
-      this.lastFinish = -1;
-      this.compactionStatus = Status.INITIAL;
-      this.tserverSession = newSession;
-    }
-  }
-  
-  public void recover(KeyExtent extent, List<Path> recoveryLogs, Set<String> tabletFiles, MutationReceiver mr) throws IOException {
-    int[] tids = new int[recoveryLogs.size()];
-    LastStartToFinish lastStartToFinish = new LastStartToFinish();
-    for (int i = 0; i < recoveryLogs.size(); i++) {
-      Path logfile = recoveryLogs.get(i);
-      log.info("Looking at mutations from " + logfile + " for " + extent);
-      MultiReader reader = new MultiReader(fs, logfile);
-      try {
-        try {
-          tids[i] = findLastStartToFinish(reader, i, extent, tabletFiles, lastStartToFinish);
-        } catch (EmptyMapFileException ex) {
-          log.info("Ignoring empty map file " + logfile);
-          tids[i] = -1;
-        } catch (UnusedException ex) {
-          log.info("Ignoring log file " + logfile + " appears to be unused by " + extent);
-          tids[i] = -1;
-        }
-      } finally {
-        try {
-          reader.close();
-        } catch (IOException ex) {
-          log.warn("Ignoring error closing file");
-        }
-      }
-      
-    }
-    
-    if (lastStartToFinish.compactionStatus == Status.LOOKING_FOR_FINISH)
-      throw new RuntimeException("COMPACTION_FINISH (without preceding COMPACTION_START) not followed by successful minor compaction");
-    
-    for (int i = 0; i < recoveryLogs.size(); i++) {
-      Path logfile = recoveryLogs.get(i);
-      MultiReader reader = new MultiReader(fs, logfile);
-      try {
-        playbackMutations(reader, tids[i], lastStartToFinish, mr);
-      } finally {
-        try {
-          reader.close();
-        } catch (IOException ex) {
-          log.warn("Ignoring error closing file");
-        }
-      }
-      log.info("Recovery complete for " + extent + " using " + logfile);
-    }
-  }
-  
-  int findLastStartToFinish(MultiReader reader, int fileno, KeyExtent extent, Set<String> tabletFiles, LastStartToFinish lastStartToFinish) throws IOException, EmptyMapFileException, UnusedException {
-    // Scan for tableId for this extent (should always be in the log)
-    LogFileKey key = new LogFileKey();
-    LogFileValue value = new LogFileValue();
-    int tid = -1;
-    if (!reader.next(key, value))
-      throw new EmptyMapFileException();
-    if (key.event != OPEN)
-      throw new RuntimeException("First log entry value is not OPEN");
-    
-    if (key.tserverSession.compareTo(lastStartToFinish.tserverSession) != 0) {
-      if (lastStartToFinish.compactionStatus == Status.LOOKING_FOR_FINISH)
-        throw new RuntimeException("COMPACTION_FINISH (without preceding COMPACTION_START) is not followed by a successful minor compaction.");
-      lastStartToFinish.update(key.tserverSession);
-    }
-    
-    LogFileKey defineKey = null;
-    
-    // find the maximum tablet id... because a tablet may leave a tserver and then come back, in which case it would have a different tablet id
-    // for the maximum tablet id, find the minimum sequence #... may be ok to find the max seq, but just want to make the code behave like it used to
-    while (reader.next(key, value)) {
-      // LogReader.printEntry(entry);
-      if (key.event != DEFINE_TABLET)
-        break;
-      if (key.tablet.equals(extent)) {
-        if (tid != key.tid) {
-          tid = key.tid;
-          defineKey = key;
-          key = new LogFileKey();
-        }
-      }
-    }
-    if (tid < 0) {
-      throw new UnusedException();
-    }
-    
-    log.debug("Found tid, seq " + tid + " " + defineKey.seq);
-    
-    // Scan start/stop events for this tablet
-    key = defineKey;
-    key.event = COMPACTION_START;
-    reader.seek(key);
-    while (reader.next(key, value)) {
-      // LogFileEntry.printEntry(entry);
-      if (key.tid != tid)
-        break;
-      if (key.event == COMPACTION_START) {
-        if (lastStartToFinish.compactionStatus == Status.INITIAL)
-          lastStartToFinish.compactionStatus = Status.COMPLETE;
-        if (key.seq <= lastStartToFinish.lastStart)
-          throw new RuntimeException("Sequence numbers are not increasing for start/stop events.");
-        lastStartToFinish.update(fileno, key.seq);
-        
-        // Tablet server finished the minor compaction, but didn't remove the entry from the METADATA table.
-        log.error("filename in compaction start " + key.filename);
-        if (tabletFiles.contains(key.filename))
-          lastStartToFinish.update(-1);
-      } else if (key.event == COMPACTION_FINISH) {
-        if (key.seq <= lastStartToFinish.lastStart)
-          throw new RuntimeException("Sequence numbers are not increasing for start/stop events.");
-        if (lastStartToFinish.compactionStatus == Status.INITIAL)
-          lastStartToFinish.compactionStatus = Status.LOOKING_FOR_FINISH;
-        else if (lastStartToFinish.lastFinish > lastStartToFinish.lastStart)
-          throw new RuntimeException("COMPACTION_FINISH does not have preceding COMPACTION_START event.");
-        else
-          lastStartToFinish.compactionStatus = Status.COMPLETE;
-        lastStartToFinish.update(key.seq);
-      } else
-        break;
-    }
-    return tid;
-  }
-  
-  private void playbackMutations(MultiReader reader, int tid, LastStartToFinish lastStartToFinish, MutationReceiver mr) throws IOException {
-    LogFileKey key = new LogFileKey();
-    LogFileValue value = new LogFileValue();
-    
-    // Playback mutations after the last stop to finish
-    log.info("Scanning for mutations starting at sequence number " + lastStartToFinish.seq + " for tid " + tid);
-    key.event = MUTATION;
-    key.tid = tid;
-    // the seq number for the minor compaction start is now the same as the
-    // last update made to memory. Scan up to that mutation, but not past it.
-    key.seq = lastStartToFinish.seq;
-    reader.seek(key);
-    while (true) {
-      if (!reader.next(key, value))
-        break;
-      if (key.tid != tid)
-        break;
-      // log.info("Replaying " + key);
-      // log.info(value);
-      if (key.event == MUTATION) {
-        mr.receive(value.mutations.get(0));
-      } else if (key.event == MANY_MUTATIONS) {
-        for (Mutation m : value.mutations) {
-          mr.receive(m);
-        }
-      } else {
-        throw new RuntimeException("unexpected log key type: " + key.event);
-      }
-    }
-  }
-}


Mime
View raw message