accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject [06/10] ACCUMULO-2041 extract tablet classes to new files, move tablet-related code to o.a.a.tserver.tablet, make member variables private
Date Tue, 03 Jun 2014 18:49:17 GMT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletIteratorEnvironment.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletIteratorEnvironment.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletIteratorEnvironment.java
index e13594d..d1fece5 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletIteratorEnvironment.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletIteratorEnvironment.java
@@ -42,7 +42,7 @@ public class TabletIteratorEnvironment implements IteratorEnvironment {
   private final ArrayList<SortedKeyValueIterator<Key,Value>> topLevelIterators = new ArrayList<SortedKeyValueIterator<Key,Value>>();
   private Map<FileRef,DataFileValue> files;
   
-  TabletIteratorEnvironment(IteratorScope scope, AccumuloConfiguration config) {
+  public TabletIteratorEnvironment(IteratorScope scope, AccumuloConfiguration config) {
     if (scope == IteratorScope.majc)
       throw new IllegalArgumentException("must set if compaction is full");
     
@@ -52,7 +52,7 @@ public class TabletIteratorEnvironment implements IteratorEnvironment {
     this.fullMajorCompaction = false;
   }
   
-  TabletIteratorEnvironment(IteratorScope scope, AccumuloConfiguration config, ScanFileManager trm, Map<FileRef,DataFileValue> files) {
+  public TabletIteratorEnvironment(IteratorScope scope, AccumuloConfiguration config, ScanFileManager trm, Map<FileRef,DataFileValue> files) {
     if (scope == IteratorScope.majc)
       throw new IllegalArgumentException("must set if compaction is full");
     
@@ -63,7 +63,7 @@ public class TabletIteratorEnvironment implements IteratorEnvironment {
     this.files = files;
   }
   
-  TabletIteratorEnvironment(IteratorScope scope, boolean fullMajC, AccumuloConfiguration config) {
+  public TabletIteratorEnvironment(IteratorScope scope, boolean fullMajC, AccumuloConfiguration config) {
     if (scope != IteratorScope.majc)
       throw new IllegalArgumentException("Tried to set maj compaction type when scope was " + scope);
     
@@ -101,7 +101,7 @@ public class TabletIteratorEnvironment implements IteratorEnvironment {
     topLevelIterators.add(iter);
   }
   
-  SortedKeyValueIterator<Key,Value> getTopLevelIterator(SortedKeyValueIterator<Key,Value> iter) {
+  public SortedKeyValueIterator<Key,Value> getTopLevelIterator(SortedKeyValueIterator<Key,Value> iter) {
     if (topLevelIterators.isEmpty())
       return iter;
     ArrayList<SortedKeyValueIterator<Key,Value>> allIters = new ArrayList<SortedKeyValueIterator<Key,Value>>(topLevelIterators);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 144d59b..1c07c44 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -195,17 +195,7 @@ import org.apache.accumulo.trace.instrument.Span;
 import org.apache.accumulo.trace.instrument.Trace;
 import org.apache.accumulo.trace.instrument.thrift.TraceWrap;
 import org.apache.accumulo.trace.thrift.TInfo;
-import org.apache.accumulo.tserver.Compactor.CompactionInfo;
 import org.apache.accumulo.tserver.RowLocks.RowLock;
-import org.apache.accumulo.tserver.Tablet.CommitSession;
-import org.apache.accumulo.tserver.Tablet.KVEntry;
-import org.apache.accumulo.tserver.Tablet.LookupResult;
-import org.apache.accumulo.tserver.Tablet.MinorCompactionReason;
-import org.apache.accumulo.tserver.Tablet.ScanBatch;
-import org.apache.accumulo.tserver.Tablet.Scanner;
-import org.apache.accumulo.tserver.Tablet.SplitInfo;
-import org.apache.accumulo.tserver.Tablet.TConstraintViolationException;
-import org.apache.accumulo.tserver.Tablet.TabletClosedException;
 import org.apache.accumulo.tserver.TabletServerResourceManager.TabletResourceManager;
 import org.apache.accumulo.tserver.TabletStatsKeeper.Operation;
 import org.apache.accumulo.tserver.compaction.MajorCompactionReason;
@@ -221,6 +211,17 @@ import org.apache.accumulo.tserver.metrics.TabletServerMBean;
 import org.apache.accumulo.tserver.metrics.TabletServerMinCMetrics;
 import org.apache.accumulo.tserver.metrics.TabletServerScanMetrics;
 import org.apache.accumulo.tserver.metrics.TabletServerUpdateMetrics;
+import org.apache.accumulo.tserver.tablet.CommitSession;
+import org.apache.accumulo.tserver.tablet.CompactionInfo;
+import org.apache.accumulo.tserver.tablet.CompactionWatcher;
+import org.apache.accumulo.tserver.tablet.Compactor;
+import org.apache.accumulo.tserver.tablet.KVEntry;
+import org.apache.accumulo.tserver.tablet.Tablet.LookupResult;
+import org.apache.accumulo.tserver.tablet.ScanBatch;
+import org.apache.accumulo.tserver.tablet.Scanner;
+import org.apache.accumulo.tserver.tablet.SplitInfo;
+import org.apache.accumulo.tserver.tablet.Tablet;
+import org.apache.accumulo.tserver.tablet.TabletClosedException;
 import org.apache.commons.collections.map.LRUMap;
 import org.apache.hadoop.fs.FSError;
 import org.apache.hadoop.fs.FileSystem;
@@ -252,7 +253,10 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
 
   private TabletServerLogger logger;
 
-  protected TabletServerMinCMetrics mincMetrics = new TabletServerMinCMetrics();
+  protected final TabletServerMinCMetrics mincMetrics = new TabletServerMinCMetrics();
+  public TabletServerMinCMetrics getMinCMetrics() {
+    return mincMetrics;
+  }
 
   private ServerConfiguration serverConfig;
   private LogSorter logSorter = null;
@@ -629,7 +633,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
     }
   }
 
-  static class TservConstraintEnv implements Environment {
+  public static class TservConstraintEnv implements Environment {
 
     private TCredentials credentials;
     private SecurityOperation security;
@@ -641,7 +645,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
       this.credentials = credentials;
     }
 
-    void setExtent(KeyExtent ke) {
+    public void setExtent(KeyExtent ke) {
       this.ke = ke;
     }
 
@@ -1659,16 +1663,16 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
 
             commitSession.commit(mutations);
 
-            Tablet tablet = commitSession.getTablet();
+            KeyExtent extent = commitSession.getExtent();
 
-            if (tablet == us.currentTablet) {
+            if (extent == us.currentTablet.getExtent()) {
               // because constraint violations may filter out some
               // mutations, for proper
               // accounting with the client code, need to increment
               // the count based
               // on the original number of mutations from the client
               // NOT the filtered number
-              us.successfulCommits.increment(tablet, us.queuedMutations.get(tablet).size());
+              us.successfulCommits.increment(us.currentTablet, us.queuedMutations.get(us.currentTablet).size());
             }
           }
           long t2 = System.currentTimeMillis();
@@ -2141,7 +2145,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
         KeyExtent ke = entry.getKey();
         if (ke.getTableId().compareTo(text) == 0) {
           Tablet tablet = entry.getValue();
-          TabletStats stats = tablet.timer.getTabletStats();
+          TabletStats stats = tablet.getTabletStats();
           stats.extent = ke.toThrift();
           stats.ingestRate = tablet.ingestRate();
           stats.queryRate = tablet.queryRate();
@@ -2563,11 +2567,11 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
     }
   }
 
-  boolean isMajorCompactionDisabled() {
+  public boolean isMajorCompactionDisabled() {
     return majorCompactorDisabled;
   }
 
-  void executeSplit(Tablet tablet) {
+  public void executeSplit(Tablet tablet) {
     resourceManager.executeSplit(tablet.getExtent(), new LoggingRunnable(log, new SplitRunner(tablet)));
   }
 
@@ -2617,7 +2621,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
             }
 
             synchronized (tablet) {
-              if (tablet.initiateMajorCompaction(MajorCompactionReason.NORMAL) || tablet.majorCompactionQueued() || tablet.majorCompactionRunning()) {
+              if (tablet.initiateMajorCompaction(MajorCompactionReason.NORMAL) || tablet.isMajorCompactionQueued() || tablet.isMajorCompactionRunning()) {
                 numMajorCompactionsInProgress++;
                 continue;
               }
@@ -2683,16 +2687,15 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
 
     Entry<KeyExtent,SplitInfo> first = tabletInfo.firstEntry();
     TabletResourceManager newTrm0 = resourceManager.createTabletResourceManager(first.getKey(), getTableConfiguration(first.getKey()));
-    newTablets[0] = new Tablet(first.getKey(), TabletServer.this, newTrm0, first.getValue());
+    newTablets[0] = new Tablet(TabletServer.this, first.getKey(), newTrm0, first.getValue());
 
     Entry<KeyExtent,SplitInfo> last = tabletInfo.lastEntry();
     TabletResourceManager newTrm1 = resourceManager.createTabletResourceManager(last.getKey(), getTableConfiguration(last.getKey()));
-    newTablets[1] = new Tablet(last.getKey(), TabletServer.this, newTrm1, last.getValue());
+    newTablets[1] = new Tablet(TabletServer.this, last.getKey(), newTrm1, last.getValue());
 
     // roll tablet stats over into tablet server's statsKeeper object as
     // historical data
-    statsKeeper.saveMinorTimes(tablet.timer);
-    statsKeeper.saveMajorTimes(tablet.timer);
+    statsKeeper.saveMajorMinorTimes(tablet.getTabletStats());
 
     // lose the reference to the old tablet and open two new ones
     synchronized (onlineTablets) {
@@ -2719,7 +2722,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
   private BlockingDeque<MasterMessage> masterMessages = new LinkedBlockingDeque<MasterMessage>();
 
   // add a message for the main thread to send back to the master
-  void enqueueMasterMessage(MasterMessage m) {
+  public void enqueueMasterMessage(MasterMessage m) {
     masterMessages.addLast(m);
   }
 
@@ -2808,9 +2811,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
 
       // roll tablet stats over into tablet server's statsKeeper object as
       // historical data
-      statsKeeper.saveMinorTimes(t.timer);
-      statsKeeper.saveMajorTimes(t.timer);
-
+      statsKeeper.saveMajorMinorTimes(t.getTabletStats());
       log.info("unloaded " + extent);
 
     }
@@ -2914,7 +2915,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
         // this opens the tablet file and fills in the endKey in the
         // extent
         locationToOpen = VolumeUtil.switchRootTabletVolume(extent, locationToOpen);
-        tablet = new Tablet(TabletServer.this, locationToOpen, extent, trm, tabletsKeyValues);
+        tablet = new Tablet(TabletServer.this, extent, locationToOpen, trm, tabletsKeyValues);
         /*
          * If a minor compaction starts after a tablet opens, this indicates a log recovery occurred. This recovered data must be minor compacted.
          *
@@ -3018,7 +3019,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
 
   private static ObjectName OBJECT_NAME = null;
 
-  static AtomicLong seekCount = new AtomicLong(0);
+  public static final AtomicLong seekCount = new AtomicLong(0);
 
   public TabletStatsKeeper getStatsKeeper() {
     return statsKeeper;
@@ -3098,7 +3099,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
     return address;
   }
 
-  ZooLock getLock() {
+  public ZooLock getLock() {
     return tabletServerLock;
   }
 
@@ -3452,7 +3453,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
     return clientAddress.getHostText() + ":" + clientAddress.getPort();
   }
 
-  TServerInstance getTabletSession() {
+  public TServerInstance getTabletSession() {
     String address = getClientAddressString();
     if (address == null)
       return null;
@@ -3596,13 +3597,13 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
       table.scanRate += tablet.scanRate();
       long recsInMemory = tablet.getNumEntriesInMemory();
       table.recsInMemory += recsInMemory;
-      if (tablet.minorCompactionRunning())
+      if (tablet.isMinorCompactionRunning())
         table.minors.running++;
-      if (tablet.minorCompactionQueued())
+      if (tablet.isMinorCompactionQueued())
         table.minors.queued++;
-      if (tablet.majorCompactionRunning())
+      if (tablet.isMajorCompactionRunning())
         table.majors.running++;
-      if (tablet.majorCompactionQueued())
+      if (tablet.isMajorCompactionQueued())
         table.majors.queued++;
     }
 
@@ -3775,7 +3776,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
     if (this.isEnabled()) {
       int result = 0;
       for (Tablet tablet : Collections.unmodifiableCollection(onlineTablets.values())) {
-        if (tablet.majorCompactionQueued())
+        if (tablet.isMajorCompactionQueued())
           result++;
       }
       return result;
@@ -3788,7 +3789,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
     if (this.isEnabled()) {
       int result = 0;
       for (Tablet tablet : Collections.unmodifiableCollection(onlineTablets.values())) {
-        if (tablet.minorCompactionRunning())
+        if (tablet.isMinorCompactionRunning())
           result++;
       }
       return result;
@@ -3801,7 +3802,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
     if (this.isEnabled()) {
       int result = 0;
       for (Tablet tablet : Collections.unmodifiableCollection(onlineTablets.values())) {
-        if (tablet.minorCompactionQueued())
+        if (tablet.isMinorCompactionQueued())
           result++;
       }
       return result;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
index f26c74b..095f8d5 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
@@ -54,11 +54,11 @@ import org.apache.accumulo.server.tabletserver.TabletState;
 import org.apache.accumulo.server.util.time.SimpleTimer;
 import org.apache.accumulo.trace.instrument.TraceExecutorService;
 import org.apache.accumulo.tserver.FileManager.ScanFileManager;
-import org.apache.accumulo.tserver.Tablet.MinorCompactionReason;
 import org.apache.accumulo.tserver.compaction.CompactionStrategy;
 import org.apache.accumulo.tserver.compaction.DefaultCompactionStrategy;
 import org.apache.accumulo.tserver.compaction.MajorCompactionReason;
 import org.apache.accumulo.tserver.compaction.MajorCompactionRequest;
+import org.apache.accumulo.tserver.tablet.Tablet;
 import org.apache.log4j.Logger;
 
 /**
@@ -69,34 +69,34 @@ import org.apache.log4j.Logger;
  */
 public class TabletServerResourceManager {
 
-  private ExecutorService minorCompactionThreadPool;
-  private ExecutorService majorCompactionThreadPool;
-  private ExecutorService rootMajorCompactionThreadPool;
-  private ExecutorService defaultMajorCompactionThreadPool;
-  private ExecutorService splitThreadPool;
-  private ExecutorService defaultSplitThreadPool;
-  private ExecutorService defaultMigrationPool;
-  private ExecutorService migrationPool;
-  private ExecutorService assignmentPool;
-  private ExecutorService assignMetaDataPool;
-  private ExecutorService readAheadThreadPool;
-  private ExecutorService defaultReadAheadThreadPool;
-  private Map<String,ExecutorService> threadPools = new TreeMap<String,ExecutorService>();
+  private static final Logger log = Logger.getLogger(TabletServerResourceManager.class);
+
+  private final ExecutorService minorCompactionThreadPool;
+  private final ExecutorService majorCompactionThreadPool;
+  private final ExecutorService rootMajorCompactionThreadPool;
+  private final ExecutorService defaultMajorCompactionThreadPool;
+  private final ExecutorService splitThreadPool;
+  private final ExecutorService defaultSplitThreadPool;
+  private final ExecutorService defaultMigrationPool;
+  private final ExecutorService migrationPool;
+  private final ExecutorService assignmentPool;
+  private final ExecutorService assignMetaDataPool;
+  private final ExecutorService readAheadThreadPool;
+  private final ExecutorService defaultReadAheadThreadPool;
+  private final Map<String,ExecutorService> threadPools = new TreeMap<String,ExecutorService>();
 
   private final VolumeManager fs;
 
-  private FileManager fileManager;
+  private final FileManager fileManager;
 
-  private MemoryManager memoryManager;
+  private final MemoryManager memoryManager;
 
-  private MemoryManagementFramework memMgmt;
+  private final MemoryManagementFramework memMgmt;
 
   private final LruBlockCache _dCache;
   private final LruBlockCache _iCache;
   private final ServerConfiguration conf;
 
-  private static final Logger log = Logger.getLogger(TabletServerResourceManager.class);
-
   private ExecutorService addEs(String name, ExecutorService tp) {
     if (threadPools.containsKey(name)) {
       throw new IllegalArgumentException("Cannot create two executor services with same name " + name);
@@ -210,10 +210,10 @@ public class TabletServerResourceManager {
 
   private static class TabletStateImpl implements TabletState, Cloneable {
 
-    private long lct;
-    private Tablet tablet;
-    private long mts;
-    private long mcmts;
+    private final long lct;
+    private final Tablet tablet;
+    private final long mts;
+    private final long mcmts;
 
     public TabletStateImpl(Tablet t, long mts, long lct, long mcmts) {
       this.tablet = t;
@@ -249,11 +249,12 @@ public class TabletServerResourceManager {
 
   private class MemoryManagementFramework {
     private final Map<KeyExtent,TabletStateImpl> tabletReports;
-    private LinkedBlockingQueue<TabletStateImpl> memUsageReports;
+    private final LinkedBlockingQueue<TabletStateImpl> memUsageReports;
     private long lastMemCheckTime = System.currentTimeMillis();
     private long maxMem;
-    private Thread memoryGuardThread;
-    private Thread minorCompactionInitiatorThread;
+    private long lastMemTotal = 0;
+    private final Thread memoryGuardThread;
+    private final Thread minorCompactionInitiatorThread;
 
     MemoryManagementFramework() {
       tabletReports = Collections.synchronizedMap(new HashMap<KeyExtent,TabletStateImpl>());
@@ -287,8 +288,6 @@ public class TabletServerResourceManager {
       minorCompactionInitiatorThread.start();
     }
 
-    private long lastMemTotal = 0;
-
     private void processTabletMemStats() {
       while (true) {
         try {
@@ -494,7 +493,7 @@ public class TabletServerResourceManager {
       lastReportedCommitTime = System.currentTimeMillis();
     }
 
-    synchronized ScanFileManager newScanFileManager() {
+    public synchronized ScanFileManager newScanFileManager() {
       if (closed)
         throw new IllegalStateException("closed");
       return fileManager.newScanFileManager(extent);
@@ -504,8 +503,8 @@ public class TabletServerResourceManager {
 
     // BEGIN methods that Tablets call to manage memory
 
-    private AtomicLong lastReportedSize = new AtomicLong();
-    private AtomicLong lastReportedMincSize = new AtomicLong();
+    private final AtomicLong lastReportedSize = new AtomicLong();
+    private final AtomicLong lastReportedMincSize = new AtomicLong();
     private volatile long lastReportedCommitTime = 0;
 
     public void updateMemoryUsageStats(Tablet tablet, long size, long mincSize) {
@@ -544,7 +543,7 @@ public class TabletServerResourceManager {
     // BEGIN methods that Tablets call to make decisions about major compaction
     // when too many files are open, we may want tablets to compact down
     // to one map file
-    boolean needsMajorCompaction(SortedMap<FileRef,DataFileValue> tabletFiles, MajorCompactionReason reason) {
+    public boolean needsMajorCompaction(SortedMap<FileRef,DataFileValue> tabletFiles, MajorCompactionReason reason) {
       if (closed)
         return false;// throw new IOException("closed");
 
@@ -585,11 +584,11 @@ public class TabletServerResourceManager {
     // tablets call this method to run minor compactions,
     // this allows us to control how many minor compactions
     // run concurrently in a tablet server
-    void executeMinorCompaction(final Runnable r) {
+    public void executeMinorCompaction(final Runnable r) {
       minorCompactionThreadPool.execute(new LoggingRunnable(log, r));
     }
 
-    void close() throws IOException {
+    public void close() throws IOException {
       // always obtain locks in same order to avoid deadlock
       synchronized (TabletServerResourceManager.this) {
         synchronized (this) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletStatsKeeper.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletStatsKeeper.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletStatsKeeper.java
index 58e16be..d914ac6 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletStatsKeeper.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletStatsKeeper.java
@@ -81,6 +81,11 @@ public class TabletStatsKeeper {
     
   }
   
+  public void saveMajorMinorTimes(TabletStats t) {
+    ActionStatsUpdator.update(minor, t.minors);
+    ActionStatsUpdator.update(major, t.majors);
+  }
+  
   public void saveMinorTimes(TabletStatsKeeper t) {
     ActionStatsUpdator.update(minor, t.minor);
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index 871f4ae..9fec437 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@ -36,10 +36,10 @@ import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.server.conf.TableConfiguration;
 import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.accumulo.tserver.Tablet.CommitSession;
 import org.apache.accumulo.tserver.TabletMutations;
 import org.apache.accumulo.tserver.TabletServer;
 import org.apache.accumulo.tserver.log.DfsLogger.LoggerOperation;
+import org.apache.accumulo.tserver.tablet.CommitSession;
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.Logger;
 
@@ -81,7 +81,7 @@ public class TabletServerLogger {
   }
 
   private static boolean enabled(CommitSession commitSession) {
-    return enabled(commitSession.getTablet().getTableConfiguration());
+    return commitSession.getUseWAL();
   }
 
   static private abstract class TestCallWithWriteLock {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Batch.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Batch.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Batch.java
new file mode 100644
index 0000000..73434c6
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Batch.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.tablet;
+
+import java.util.List;
+
+import org.apache.accumulo.core.data.Key;
+
+class Batch {
+  final boolean skipContinueKey;
+  final List<KVEntry> results;
+  final Key continueKey;
+  final long numBytes;
+  
+  Batch(boolean skipContinueKey, List<KVEntry> results, Key continueKey, long numBytes) {
+    this.skipContinueKey = skipContinueKey;
+    this.results = results;
+    this.continueKey = continueKey;
+    this.numBytes = numBytes;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CommitSession.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CommitSession.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CommitSession.java
new file mode 100644
index 0000000..6402797
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CommitSession.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.tablet;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.tserver.InMemoryMap;
+import org.apache.accumulo.tserver.log.DfsLogger;
+import org.apache.log4j.Logger;
+
+public class CommitSession {
+  
+  private static final Logger log = Logger.getLogger(CommitSession.class);
+
+  private final int seq;
+  private final InMemoryMap memTable;
+  private final TabletCommitter committer;
+
+  private int commitsInProgress;
+  private long maxCommittedTime = Long.MIN_VALUE;
+
+  CommitSession(TabletCommitter committer, int seq, InMemoryMap imm) {
+    this.seq = seq;
+    this.memTable = imm;
+    this.committer = committer;
+    commitsInProgress = 0;
+  }
+
+  public int getWALogSeq() {
+    return seq;
+  }
+
+  public void decrementCommitsInProgress() {
+    if (commitsInProgress < 1)
+      throw new IllegalStateException("commitsInProgress = " + commitsInProgress);
+
+    commitsInProgress--;
+    if (commitsInProgress == 0)
+      committer.notifyAll();
+  }
+
+  public void incrementCommitsInProgress() {
+    if (commitsInProgress < 0)
+      throw new IllegalStateException("commitsInProgress = " + commitsInProgress);
+
+    commitsInProgress++;
+  }
+
+  public void waitForCommitsToFinish() {
+    while (commitsInProgress > 0) {
+      try {
+        committer.wait(50);
+      } catch (InterruptedException e) {
+        log.warn(e, e);
+      }
+    }
+  }
+
+  public void abortCommit(List<Mutation> value) {
+    committer.abortCommit(this, value);
+  }
+
+  public void commit(List<Mutation> mutations) {
+    committer.commit(this, mutations);
+  }
+
+  public TabletCommitter getTablet() {
+    return committer;
+  }
+
+  public boolean beginUpdatingLogsUsed(ArrayList<DfsLogger> copy, boolean mincFinish) {
+    return committer.beginUpdatingLogsUsed(memTable, copy, mincFinish);
+  }
+
+  public void finishUpdatingLogsUsed() {
+    committer.finishUpdatingLogsUsed();
+  }
+
+  public int getLogId() {
+    return committer.getLogId();
+  }
+
+  public KeyExtent getExtent() {
+    return committer.getExtent();
+  }
+
+  public void updateMaxCommittedTime(long time) {
+    maxCommittedTime = Math.max(time, maxCommittedTime);
+  }
+
+  public long getMaxCommittedTime() {
+    if (maxCommittedTime == Long.MIN_VALUE)
+      throw new IllegalStateException("Tried to read max committed time when it was never set");
+    return maxCommittedTime;
+  }
+
+  public boolean getUseWAL() {
+    return committer.getUseWAL();
+  }
+
+  public void mutate(List<Mutation> mutations) {
+    memTable.mutate(mutations);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactionInfo.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactionInfo.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactionInfo.java
new file mode 100644
index 0000000..ab57d65
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactionInfo.java
@@ -0,0 +1,113 @@
+package org.apache.accumulo.tserver.tablet;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.thrift.IterInfo;
+import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction;
+import org.apache.accumulo.core.tabletserver.thrift.CompactionReason;
+import org.apache.accumulo.core.tabletserver.thrift.CompactionType;
+import org.apache.accumulo.server.fs.FileRef;
+
+public class CompactionInfo {
+
+  private final Compactor compactor;
+  private final String localityGroup;
+  private final long entriesRead;
+  private final long entriesWritten;
+
+  CompactionInfo(Compactor compactor) {
+    this.localityGroup = compactor.getCurrentLocalityGroup();
+    this.entriesRead = compactor.getEntriesRead();
+    this.entriesWritten = compactor.getEntriesWritten();
+    this.compactor = compactor;
+  }
+
+  public long getID() {
+    return compactor.getCompactorID();
+  }
+
+  public KeyExtent getExtent() {
+    return compactor.getExtent();
+  }
+
+  public long getEntriesRead() {
+    return entriesRead;
+  }
+
+  public long getEntriesWritten() {
+    return entriesWritten;
+  }
+
+  public Thread getThread() {
+    return compactor.thread;
+  }
+
+  public String getOutputFile() {
+    return compactor.getOutputFile();
+  }
+
+  public ActiveCompaction toThrift() {
+
+    CompactionType type;
+
+    if (compactor.hasIMM())
+      if (compactor.getFilesToCompact().size() > 0)
+        type = CompactionType.MERGE;
+      else
+        type = CompactionType.MINOR;
+    else if (!compactor.willPropogateDeletes())
+      type = CompactionType.FULL;
+    else
+      type = CompactionType.MAJOR;
+
+    CompactionReason reason;
+
+    if (compactor.hasIMM())
+      switch (compactor.getMinCReason()) {
+        case USER:
+          reason = CompactionReason.USER;
+          break;
+        case CLOSE:
+          reason = CompactionReason.CLOSE;
+          break;
+        case SYSTEM:
+        default:
+          reason = CompactionReason.SYSTEM;
+          break;
+      }
+    else
+      switch (compactor.getMajorCompactionReason()) {
+        case USER:
+          reason = CompactionReason.USER;
+          break;
+        case CHOP:
+          reason = CompactionReason.CHOP;
+          break;
+        case IDLE:
+          reason = CompactionReason.IDLE;
+          break;
+        case NORMAL:
+        default:
+          reason = CompactionReason.SYSTEM;
+          break;
+      }
+
+    List<IterInfo> iiList = new ArrayList<IterInfo>();
+    Map<String,Map<String,String>> iterOptions = new HashMap<String,Map<String,String>>();
+
+    for (IteratorSetting iterSetting : compactor.getIterators()) {
+      iiList.add(new IterInfo(iterSetting.getPriority(), iterSetting.getIteratorClass(), iterSetting.getName()));
+      iterOptions.put(iterSetting.getName(), iterSetting.getOptions());
+    }
+    List<String> filesToCompact = new ArrayList<String>();
+    for (FileRef ref : compactor.getFilesToCompact())
+      filesToCompact.add(ref.toString());
+    return new ActiveCompaction(compactor.extent.toThrift(), System.currentTimeMillis() - compactor.getStartTime(), filesToCompact,
+        compactor.getOutputFile(), type, reason, localityGroup, entriesRead, entriesWritten, iiList, iterOptions);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactionRunner.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactionRunner.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactionRunner.java
new file mode 100644
index 0000000..de5a66d
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactionRunner.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.tablet;
+
+import org.apache.accumulo.tserver.compaction.MajorCompactionReason;
+
+class CompactionRunner implements Runnable, Comparable<CompactionRunner> {
+
+  private final Tablet tablet;
+  private final MajorCompactionReason reason;
+  private final long queued;
+
+  public CompactionRunner(Tablet tablet, MajorCompactionReason reason) {
+    this.tablet = tablet;
+    queued = System.currentTimeMillis();
+    this.reason = reason;
+  }
+
+  @Override
+  public void run() {
+    if (tablet.getTabletServer().isMajorCompactionDisabled()) {
+      // this will make compaction tasks that were queued when shutdown was
+      // initiated exit
+      tablet.removeMajorCompactionQueuedReason(reason);
+      return;
+    }
+
+    tablet.majorCompact(reason, queued);
+
+    // if there is more work to be done, queue another major compaction
+    synchronized (tablet) {
+      if (reason == MajorCompactionReason.NORMAL && tablet.needsMajorCompaction(reason))
+        tablet.initiateMajorCompaction(reason);
+    }
+  }
+
+  // We used to synchronize on the Tablet before fetching this information,
+  // but this method is called by the compaction queue thread to re-order the compactions.
+  // The compaction queue holds a lock during this sort.
+  // A tablet lock can be held while putting itself on the queue, so we can't lock the tablet
+  // while pulling information used to sort the tablets in the queue, or we may get deadlocked.
+  // See ACCUMULO-1110.
+  private int getNumFiles() {
+    return tablet.getDatafileManager().getNumFiles();
+  }
+
+  @Override
+  public int compareTo(CompactionRunner o) {
+    int cmp = reason.compareTo(o.reason);
+    if (cmp != 0)
+      return cmp;
+
+    if (reason == MajorCompactionReason.USER || reason == MajorCompactionReason.CHOP) {
+      // for these types of compactions want to do the oldest first
+      cmp = (int) (queued - o.queued);
+      if (cmp != 0)
+        return cmp;
+    }
+
+    return o.getNumFiles() - this.getNumFiles();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactionStats.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactionStats.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactionStats.java
new file mode 100644
index 0000000..69832e9
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactionStats.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.tablet;
+
+public class CompactionStats {
+  private long entriesRead;
+  private long entriesWritten;
+  private long fileSize;
+  
+  CompactionStats(long er, long ew) {
+    this.setEntriesRead(er);
+    this.setEntriesWritten(ew);
+  }
+  
+  public CompactionStats() {}
+  
+  private void setEntriesRead(long entriesRead) {
+    this.entriesRead = entriesRead;
+  }
+  
+  public long getEntriesRead() {
+    return entriesRead;
+  }
+  
+  private void setEntriesWritten(long entriesWritten) {
+    this.entriesWritten = entriesWritten;
+  }
+  
+  public long getEntriesWritten() {
+    return entriesWritten;
+  }
+  
+  public void add(CompactionStats mcs) {
+    this.entriesRead += mcs.entriesRead;
+    this.entriesWritten += mcs.entriesWritten;
+  }
+  
+  public void setFileSize(long fileSize) {
+    this.fileSize = fileSize;
+  }
+  
+  public long getFileSize() {
+    return this.fileSize;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactionWatcher.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactionWatcher.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactionWatcher.java
new file mode 100644
index 0000000..1ca1f33
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactionWatcher.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.tablet;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.server.util.time.SimpleTimer;
+import org.apache.log4j.Logger;
+
+/**
+ * 
+ */
+public class CompactionWatcher implements Runnable {
+  private final Map<List<Long>,ObservedCompactionInfo> observedCompactions = new HashMap<List<Long>,ObservedCompactionInfo>();
+  private final AccumuloConfiguration config;
+  private static boolean watching = false;
+  
+  private static class ObservedCompactionInfo {
+    CompactionInfo compactionInfo;
+    long firstSeen;
+    boolean loggedWarning;
+    
+    ObservedCompactionInfo(CompactionInfo ci, long time) {
+      this.compactionInfo = ci;
+      this.firstSeen = time;
+    }
+  }
+
+  public CompactionWatcher(AccumuloConfiguration config) {
+    this.config = config;
+  }
+
+  public void run() {
+    List<CompactionInfo> runningCompactions = Compactor.getRunningCompactions();
+    
+    Set<List<Long>> newKeys = new HashSet<List<Long>>();
+    
+    long time = System.currentTimeMillis();
+
+    for (CompactionInfo ci : runningCompactions) {
+      List<Long> compactionKey = Arrays.asList(ci.getID(), ci.getEntriesRead(), ci.getEntriesWritten());
+      newKeys.add(compactionKey);
+      
+      if (!observedCompactions.containsKey(compactionKey)) {
+        observedCompactions.put(compactionKey, new ObservedCompactionInfo(ci, time));
+      }
+    }
+    
+    // look for compactions that finished or made progress and logged a warning
+    HashMap<List<Long>,ObservedCompactionInfo> copy = new HashMap<List<Long>,ObservedCompactionInfo>(observedCompactions);
+    copy.keySet().removeAll(newKeys);
+    
+    for (ObservedCompactionInfo oci : copy.values()) {
+      if (oci.loggedWarning) {
+        Logger.getLogger(CompactionWatcher.class).info("Compaction of " + oci.compactionInfo.getExtent() + " is no longer stuck");
+      }
+    }
+
+    // remove any compaction that completed or made progress
+    observedCompactions.keySet().retainAll(newKeys);
+    
+    long warnTime = config.getTimeInMillis(Property.TSERV_COMPACTION_WARN_TIME);
+
+    // check for stuck compactions
+    for (ObservedCompactionInfo oci : observedCompactions.values()) {
+      if (time - oci.firstSeen > warnTime && !oci.loggedWarning) {
+        Thread compactionThread = oci.compactionInfo.getThread();
+        if (compactionThread != null) {
+          StackTraceElement[] trace = compactionThread.getStackTrace();
+          Exception e = new Exception("Possible stack trace of compaction stuck on " + oci.compactionInfo.getExtent());
+          e.setStackTrace(trace);
+          Logger.getLogger(CompactionWatcher.class).warn(
+              "Compaction of " + oci.compactionInfo.getExtent() + " to " + oci.compactionInfo.getOutputFile() + " has not made progress for at least "
+                  + (time - oci.firstSeen) + "ms", e);
+          oci.loggedWarning = true;
+        }
+      }
+    }
+  }
+
+  public static synchronized void startWatching(AccumuloConfiguration config) {
+    if (!watching) {
+      SimpleTimer.getInstance(config).schedule(new CompactionWatcher(config), 10000, 10000);
+      watching = true;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Compactor.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Compactor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Compactor.java
new file mode 100644
index 0000000..2a3e2f4
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Compactor.java
@@ -0,0 +1,477 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.tablet;
+
+import java.io.IOException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.file.FileSKVWriter;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.IteratorUtil;
+import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.WrappingIterator;
+import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator;
+import org.apache.accumulo.core.iterators.system.DeletingIterator;
+import org.apache.accumulo.core.iterators.system.MultiIterator;
+import org.apache.accumulo.core.iterators.system.TimeSettingIterator;
+import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.core.util.LocalityGroupUtil;
+import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError;
+import org.apache.accumulo.server.fs.FileRef;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.problems.ProblemReport;
+import org.apache.accumulo.server.problems.ProblemReportingIterator;
+import org.apache.accumulo.server.problems.ProblemReports;
+import org.apache.accumulo.server.problems.ProblemType;
+import org.apache.accumulo.trace.instrument.Span;
+import org.apache.accumulo.trace.instrument.Trace;
+import org.apache.accumulo.tserver.InMemoryMap;
+import org.apache.accumulo.tserver.MinorCompactionReason;
+import org.apache.accumulo.tserver.TabletIteratorEnvironment;
+import org.apache.accumulo.tserver.compaction.MajorCompactionReason;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.log4j.Logger;
+
+public class Compactor implements Callable<CompactionStats> {
+  private static final Logger log = Logger.getLogger(Compactor.class);
+  private static final AtomicLong nextCompactorID = new AtomicLong(0);
+
+  public static class CountingIterator extends WrappingIterator {
+
+    private long count;
+    private final ArrayList<CountingIterator> deepCopies;
+    private final AtomicLong entriesRead;
+
+    @Override
+    public CountingIterator deepCopy(IteratorEnvironment env) {
+      return new CountingIterator(this, env);
+    }
+
+    private CountingIterator(CountingIterator other, IteratorEnvironment env) {
+      setSource(other.getSource().deepCopy(env));
+      count = 0;
+      this.deepCopies = other.deepCopies;
+      this.entriesRead = other.entriesRead;
+      deepCopies.add(this);
+    }
+
+    public CountingIterator(SortedKeyValueIterator<Key,Value> source, AtomicLong entriesRead) {
+      deepCopies = new ArrayList<Compactor.CountingIterator>();
+      this.setSource(source);
+      count = 0;
+      this.entriesRead = entriesRead;
+    }
+
+    @Override
+    public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void next() throws IOException {
+      super.next();
+      count++;
+      if (count % 1024 == 0) {
+        entriesRead.addAndGet(1024);
+      }
+    }
+
+    public long getCount() {
+      long sum = 0;
+      for (CountingIterator dc : deepCopies) {
+        sum += dc.count;
+      }
+
+      return count + sum;
+    }
+  }
+
+
+  public static class CompactionCanceledException extends Exception {
+    private static final long serialVersionUID = 1L;
+  }
+
+  public interface CompactionEnv {
+    
+    boolean isCompactionEnabled();
+
+    IteratorScope getIteratorScope();
+  }
+
+  private final Map<FileRef,DataFileValue> filesToCompact;
+  private final InMemoryMap imm;
+  private final FileRef outputFile;
+  private final boolean propogateDeletes;
+  private final AccumuloConfiguration acuTableConf;
+  private final CompactionEnv env;
+  private final VolumeManager fs;
+  protected final KeyExtent extent;
+  private final List<IteratorSetting> iterators;
+
+  // things to report
+  private String currentLocalityGroup = "";
+  private final long startTime;
+
+  private int reason;
+
+  private final AtomicLong entriesRead = new AtomicLong(0);
+  private final AtomicLong entriesWritten = new AtomicLong(0);
+  private final DateFormat dateFormatter = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");
+
+  // a unique id to identify a compactor
+  private final long compactorID = nextCompactorID.getAndIncrement();
+  protected volatile Thread thread;
+
+  public long getCompactorID() { return compactorID; }
+
+  private synchronized void setLocalityGroup(String name) {
+    this.currentLocalityGroup = name;
+  }
+  
+  public synchronized String getCurrentLocalityGroup() {
+    return currentLocalityGroup;
+  }
+
+  private void clearStats() {
+    entriesRead.set(0);
+    entriesWritten.set(0);
+  }
+
+  protected static final Set<Compactor> runningCompactions = Collections.synchronizedSet(new HashSet<Compactor>());
+
+  public static List<CompactionInfo> getRunningCompactions() {
+    ArrayList<CompactionInfo> compactions = new ArrayList<CompactionInfo>();
+
+    synchronized (runningCompactions) {
+      for (Compactor compactor : runningCompactions) {
+        compactions.add(new CompactionInfo(compactor));
+      }
+    }
+
+    return compactions;
+  }
+
+  public Compactor(VolumeManager fs, Map<FileRef,DataFileValue> files, InMemoryMap imm, FileRef outputFile, boolean propogateDeletes,
+      AccumuloConfiguration acuTableConf, KeyExtent extent, CompactionEnv env, List<IteratorSetting> iterators, int reason) {
+    this.extent = extent;
+    this.fs = fs;
+    this.filesToCompact = files;
+    this.imm = imm;
+    this.outputFile = outputFile;
+    this.propogateDeletes = propogateDeletes;
+    this.acuTableConf = acuTableConf;
+    this.env = env;
+    this.iterators = iterators;
+    this.reason = reason;
+
+    startTime = System.currentTimeMillis();
+  }
+
+  public VolumeManager getFileSystem() {
+    return fs;
+  }
+
+  KeyExtent getExtent() {
+    return extent;
+  }
+
+  String getOutputFile() {
+    return outputFile.toString();
+  }
+
+  MajorCompactionReason getMajorCompactionReason() { return MajorCompactionReason.values()[reason]; }
+
+  @Override
+  public CompactionStats call() throws IOException, CompactionCanceledException {
+
+    FileSKVWriter mfw = null;
+
+    CompactionStats majCStats = new CompactionStats();
+
+    boolean remove = runningCompactions.add(this);
+
+    clearStats();
+
+    String oldThreadName = Thread.currentThread().getName();
+    String newThreadName = "MajC compacting " + extent.toString() + " started " + dateFormatter.format(new Date()) + " file: " + outputFile;
+    Thread.currentThread().setName(newThreadName);
+    thread = Thread.currentThread();
+    try {
+      FileOperations fileFactory = FileOperations.getInstance();
+      FileSystem ns = this.fs.getVolumeByPath(outputFile.path()).getFileSystem();
+      mfw = fileFactory.openWriter(outputFile.path().toString(), ns, ns.getConf(), acuTableConf);
+
+      Map<String,Set<ByteSequence>> lGroups;
+      try {
+        lGroups = LocalityGroupUtil.getLocalityGroups(acuTableConf);
+      } catch (LocalityGroupConfigurationError e) {
+        throw new IOException(e);
+      }
+
+      long t1 = System.currentTimeMillis();
+
+      HashSet<ByteSequence> allColumnFamilies = new HashSet<ByteSequence>();
+
+      if (mfw.supportsLocalityGroups()) {
+        for (Entry<String,Set<ByteSequence>> entry : lGroups.entrySet()) {
+          setLocalityGroup(entry.getKey());
+          compactLocalityGroup(entry.getKey(), entry.getValue(), true, mfw, majCStats);
+          allColumnFamilies.addAll(entry.getValue());
+        }
+      }
+
+      setLocalityGroup("");
+      compactLocalityGroup(null, allColumnFamilies, false, mfw, majCStats);
+
+      long t2 = System.currentTimeMillis();
+
+      FileSKVWriter mfwTmp = mfw;
+      mfw = null; // set this to null so we do not try to close it again in finally if the close fails
+      mfwTmp.close(); // if the close fails it will cause the compaction to fail
+
+      // Verify the file, since hadoop 0.20.2 sometimes lies about the success of close()
+      try {
+        FileSKVIterator openReader = fileFactory.openReader(outputFile.path().toString(), false, ns, ns.getConf(), acuTableConf);
+        openReader.close();
+      } catch (IOException ex) {
+        log.error("Verification of successful compaction fails!!! " + extent + " " + outputFile, ex);
+        throw ex;
+      }
+
+      log.debug(String.format("Compaction %s %,d read | %,d written | %,6d entries/sec | %6.3f secs", extent, majCStats.getEntriesRead(),
+          majCStats.getEntriesWritten(), (int) (majCStats.getEntriesRead() / ((t2 - t1) / 1000.0)), (t2 - t1) / 1000.0));
+
+      majCStats.setFileSize(fileFactory.getFileSize(outputFile.path().toString(), ns, ns.getConf(), acuTableConf));
+      return majCStats;
+    } catch (IOException e) {
+      log.error(e, e);
+      throw e;
+    } catch (RuntimeException e) {
+      log.error(e, e);
+      throw e;
+    } finally {
+      Thread.currentThread().setName(oldThreadName);
+      if (remove) {
+        thread = null;
+        runningCompactions.remove(this);
+      }
+
+      try {
+        if (mfw != null) {
+          // compaction must not have finished successfully, so close its output file
+          try {
+            mfw.close();
+          } finally {
+            if (!fs.deleteRecursively(outputFile.path()))
+              if (fs.exists(outputFile.path()))
+                log.error("Unable to delete " + outputFile);
+          }
+        }
+      } catch (IOException e) {
+        log.warn(e, e);
+      } catch (RuntimeException exception) {
+        log.warn(exception, exception);
+      }
+    }
+  }
+
+  private List<SortedKeyValueIterator<Key,Value>> openMapDataFiles(String lgName, ArrayList<FileSKVIterator> readers) throws IOException {
+
+    List<SortedKeyValueIterator<Key,Value>> iters = new ArrayList<SortedKeyValueIterator<Key,Value>>(filesToCompact.size());
+
+    for (FileRef mapFile : filesToCompact.keySet()) {
+      try {
+
+        FileOperations fileFactory = FileOperations.getInstance();
+        FileSystem fs = this.fs.getVolumeByPath(mapFile.path()).getFileSystem();
+        FileSKVIterator reader;
+
+        reader = fileFactory.openReader(mapFile.path().toString(), false, fs, fs.getConf(), acuTableConf);
+
+        readers.add(reader);
+
+        SortedKeyValueIterator<Key,Value> iter = new ProblemReportingIterator(extent.getTableId().toString(), mapFile.path().toString(), false, reader);
+
+        if (filesToCompact.get(mapFile).isTimeSet()) {
+          iter = new TimeSettingIterator(iter, filesToCompact.get(mapFile).getTime());
+        }
+
+        iters.add(iter);
+
+      } catch (Throwable e) {
+
+        ProblemReports.getInstance().report(new ProblemReport(extent.getTableId().toString(), ProblemType.FILE_READ, mapFile.path().toString(), e));
+
+        log.warn("Some problem opening map file " + mapFile + " " + e.getMessage(), e);
+        // failed to open some map file... close the ones that were opened
+        for (FileSKVIterator reader : readers) {
+          try {
+            reader.close();
+          } catch (Throwable e2) {
+            log.warn("Failed to close map file", e2);
+          }
+        }
+
+        readers.clear();
+
+        if (e instanceof IOException)
+          throw (IOException) e;
+        throw new IOException("Failed to open map data files", e);
+      }
+    }
+
+    return iters;
+  }
+
+  private void compactLocalityGroup(String lgName, Set<ByteSequence> columnFamilies, boolean inclusive, FileSKVWriter mfw, CompactionStats majCStats)
+      throws IOException, CompactionCanceledException {
+    ArrayList<FileSKVIterator> readers = new ArrayList<FileSKVIterator>(filesToCompact.size());
+    Span span = Trace.start("compact");
+    try {
+      long entriesCompacted = 0;
+      List<SortedKeyValueIterator<Key,Value>> iters = openMapDataFiles(lgName, readers);
+
+      if (imm != null) {
+        iters.add(imm.compactionIterator());
+      }
+
+      CountingIterator citr = new CountingIterator(new MultiIterator(iters, extent.toDataRange()), entriesRead);
+      DeletingIterator delIter = new DeletingIterator(citr, propogateDeletes);
+      ColumnFamilySkippingIterator cfsi = new ColumnFamilySkippingIterator(delIter);
+
+      // if(env.getIteratorScope() )
+
+      TabletIteratorEnvironment iterEnv;
+      if (env.getIteratorScope() == IteratorScope.majc)
+        iterEnv = new TabletIteratorEnvironment(IteratorScope.majc, !propogateDeletes, acuTableConf);
+      else if (env.getIteratorScope() == IteratorScope.minc)
+        iterEnv = new TabletIteratorEnvironment(IteratorScope.minc, acuTableConf);
+      else
+        throw new IllegalArgumentException();
+
+      SortedKeyValueIterator<Key,Value> itr = iterEnv.getTopLevelIterator(IteratorUtil.loadIterators(env.getIteratorScope(), cfsi, extent, acuTableConf,
+          iterators, iterEnv));
+
+      itr.seek(extent.toDataRange(), columnFamilies, inclusive);
+
+      if (!inclusive) {
+        mfw.startDefaultLocalityGroup();
+      } else {
+        mfw.startNewLocalityGroup(lgName, columnFamilies);
+      }
+
+      Span write = Trace.start("write");
+      try {
+        while (itr.hasTop() && env.isCompactionEnabled()) {
+          mfw.append(itr.getTopKey(), itr.getTopValue());
+          itr.next();
+          entriesCompacted++;
+
+          if (entriesCompacted % 1024 == 0) {
+            // Periodically update stats, do not want to do this too often since its volatile
+            entriesWritten.addAndGet(1024);
+          }
+        }
+
+        if (itr.hasTop() && !env.isCompactionEnabled()) {
+          // cancel major compaction operation
+          try {
+            try {
+              mfw.close();
+            } catch (IOException e) {
+              log.error(e, e);
+            }
+            fs.deleteRecursively(outputFile.path());
+          } catch (Exception e) {
+            log.warn("Failed to delete Canceled compaction output file " + outputFile, e);
+          }
+          throw new CompactionCanceledException();
+        }
+
+      } finally {
+        CompactionStats lgMajcStats = new CompactionStats(citr.getCount(), entriesCompacted);
+        majCStats.add(lgMajcStats);
+        write.stop();
+      }
+
+    } finally {
+      // close sequence files opened
+      for (FileSKVIterator reader : readers) {
+        try {
+          reader.close();
+        } catch (Throwable e) {
+          log.warn("Failed to close map file", e);
+        }
+      }
+      span.stop();
+    }
+  }
+
+  Collection<FileRef> getFilesToCompact() {
+    return filesToCompact.keySet();
+  }
+
+  boolean hasIMM() {
+    return imm != null;
+  }
+
+  boolean willPropogateDeletes() {
+    return propogateDeletes;
+  }
+
+  long getEntriesRead() {
+    return entriesRead.get();
+  }
+  
+  long getEntriesWritten() {
+    return entriesWritten.get();
+  }
+
+  long getStartTime() {
+    return startTime;
+  }
+
+  Iterable<IteratorSetting> getIterators() {
+    return this.iterators;
+  }
+
+  MinorCompactionReason getMinCReason() {
+    return MinorCompactionReason.values()[reason];
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
new file mode 100644
index 0000000..2771db9
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
@@ -0,0 +1,581 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.tablet;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.core.util.MapCounter;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.fs.FileRef;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.security.SystemCredentials;
+import org.apache.accumulo.server.util.MasterMetadataUtil;
+import org.apache.accumulo.server.util.MetadataTableUtil;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.trace.instrument.Span;
+import org.apache.accumulo.trace.instrument.Trace;
+import org.apache.accumulo.tserver.TLevel;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+class DatafileManager {
+  private final Logger log = Logger.getLogger(DatafileManager.class);
+  // access to datafilesizes needs to be synchronized: see CompactionRunner#getNumFiles
+  private final Map<FileRef,DataFileValue> datafileSizes = Collections.synchronizedMap(new TreeMap<FileRef,DataFileValue>());
+  private final Tablet tablet;
+  
+  // ensure we only have one reader/writer of our bulk file notes at at time
+  private final Object bulkFileImportLock = new Object();
+
+  DatafileManager(Tablet tablet, SortedMap<FileRef,DataFileValue> datafileSizes) {
+    for (Entry<FileRef,DataFileValue> datafiles : datafileSizes.entrySet()) {
+      this.datafileSizes.put(datafiles.getKey(), datafiles.getValue());
+    }
+    this.tablet = tablet;
+  }
+
+  private FileRef mergingMinorCompactionFile = null;
+  private final Set<FileRef> filesToDeleteAfterScan = new HashSet<FileRef>();
+  private final Map<Long,Set<FileRef>> scanFileReservations = new HashMap<Long,Set<FileRef>>();
+  private final MapCounter<FileRef> fileScanReferenceCounts = new MapCounter<FileRef>();
+  private long nextScanReservationId = 0;
+  private boolean reservationsBlocked = false;
+
+  private final Set<FileRef> majorCompactingFiles = new HashSet<FileRef>();
+  
+  static void rename(VolumeManager fs, Path src, Path dst) throws IOException {
+    if (!fs.rename(src, dst)) {
+      throw new IOException("Rename " + src + " to " + dst + " returned false ");
+    }
+  }
+
+  Pair<Long,Map<FileRef,DataFileValue>> reserveFilesForScan() {
+    synchronized (tablet) {
+
+      while (reservationsBlocked) {
+        try {
+          tablet.wait(50);
+        } catch (InterruptedException e) {
+          log.warn(e, e);
+        }
+      }
+
+      Set<FileRef> absFilePaths = new HashSet<FileRef>(datafileSizes.keySet());
+
+      long rid = nextScanReservationId++;
+
+      scanFileReservations.put(rid, absFilePaths);
+
+      Map<FileRef,DataFileValue> ret = new HashMap<FileRef,DataFileValue>();
+
+      for (FileRef path : absFilePaths) {
+        fileScanReferenceCounts.increment(path, 1);
+        ret.put(path, datafileSizes.get(path));
+      }
+
+      return new Pair<Long,Map<FileRef,DataFileValue>>(rid, ret);
+    }
+  }
+
+  void returnFilesForScan(Long reservationId) {
+
+    final Set<FileRef> filesToDelete = new HashSet<FileRef>();
+
+    synchronized (tablet) {
+      Set<FileRef> absFilePaths = scanFileReservations.remove(reservationId);
+
+      if (absFilePaths == null)
+        throw new IllegalArgumentException("Unknown scan reservation id " + reservationId);
+
+      boolean notify = false;
+      for (FileRef path : absFilePaths) {
+        long refCount = fileScanReferenceCounts.decrement(path, 1);
+        if (refCount == 0) {
+          if (filesToDeleteAfterScan.remove(path))
+            filesToDelete.add(path);
+          notify = true;
+        } else if (refCount < 0)
+          throw new IllegalStateException("Scan ref count for " + path + " is " + refCount);
+      }
+
+      if (notify)
+        tablet.notifyAll();
+    }
+
+    if (filesToDelete.size() > 0) {
+      log.debug("Removing scan refs from metadata " + tablet.getExtent() + " " + filesToDelete);
+      MetadataTableUtil.removeScanFiles(tablet.getExtent(), filesToDelete, SystemCredentials.get(), tablet.getTabletServer().getLock());
+    }
+  }
+
+  void removeFilesAfterScan(Set<FileRef> scanFiles) {
+    if (scanFiles.size() == 0)
+      return;
+
+    Set<FileRef> filesToDelete = new HashSet<FileRef>();
+
+    synchronized (tablet) {
+      for (FileRef path : scanFiles) {
+        if (fileScanReferenceCounts.get(path) == 0)
+          filesToDelete.add(path);
+        else
+          filesToDeleteAfterScan.add(path);
+      }
+    }
+
+    if (filesToDelete.size() > 0) {
+      log.debug("Removing scan refs from metadata " + tablet.getExtent() + " " + filesToDelete);
+      MetadataTableUtil.removeScanFiles(tablet.getExtent(), filesToDelete, SystemCredentials.get(), tablet.getTabletServer().getLock());
+    }
+  }
+
+  private TreeSet<FileRef> waitForScansToFinish(Set<FileRef> pathsToWaitFor, boolean blockNewScans, long maxWaitTime) {
+    long startTime = System.currentTimeMillis();
+    TreeSet<FileRef> inUse = new TreeSet<FileRef>();
+
+    Span waitForScans = Trace.start("waitForScans");
+    try {
+      synchronized (tablet) {
+        if (blockNewScans) {
+          if (reservationsBlocked)
+            throw new IllegalStateException();
+
+          reservationsBlocked = true;
+        }
+
+        for (FileRef path : pathsToWaitFor) {
+          while (fileScanReferenceCounts.get(path) > 0 && System.currentTimeMillis() - startTime < maxWaitTime) {
+            try {
+              tablet.wait(100);
+            } catch (InterruptedException e) {
+              log.warn(e, e);
+            }
+          }
+        }
+
+        for (FileRef path : pathsToWaitFor) {
+          if (fileScanReferenceCounts.get(path) > 0)
+            inUse.add(path);
+        }
+
+        if (blockNewScans) {
+          reservationsBlocked = false;
+          tablet.notifyAll();
+        }
+
+      }
+    } finally {
+      waitForScans.stop();
+    }
+    return inUse;
+  }
+
+  public void importMapFiles(long tid, Map<FileRef,DataFileValue> pathsString, boolean setTime) throws IOException {
+
+    final KeyExtent extent = tablet.getExtent();
+    String bulkDir = null;
+
+    Map<FileRef,DataFileValue> paths = new HashMap<FileRef,DataFileValue>();
+    for (Entry<FileRef,DataFileValue> entry : pathsString.entrySet())
+      paths.put(entry.getKey(), entry.getValue());
+
+    for (FileRef tpath : paths.keySet()) {
+
+      boolean inTheRightDirectory = false;
+      Path parent = tpath.path().getParent().getParent();
+      for (String tablesDir : ServerConstants.getTablesDirs()) {
+        if (parent.equals(new Path(tablesDir, tablet.getExtent().getTableId().toString()))) {
+          inTheRightDirectory = true;
+          break;
+        }
+      }
+      if (!inTheRightDirectory) {
+        throw new IOException("Data file " + tpath + " not in table dirs");
+      }
+
+      if (bulkDir == null)
+        bulkDir = tpath.path().getParent().toString();
+      else if (!bulkDir.equals(tpath.path().getParent().toString()))
+        throw new IllegalArgumentException("bulk files in different dirs " + bulkDir + " " + tpath);
+
+    }
+
+    if (tablet.getExtent().isRootTablet()) {
+      throw new IllegalArgumentException("Can not import files to root tablet");
+    }
+
+    synchronized (bulkFileImportLock) {
+      Credentials creds = SystemCredentials.get();
+      Connector conn;
+      try {
+        conn = HdfsZooInstance.getInstance().getConnector(creds.getPrincipal(), creds.getToken());
+      } catch (Exception ex) {
+        throw new IOException(ex);
+      }
+      // Remove any bulk files we've previously loaded and compacted away
+      List<FileRef> files = MetadataTableUtil.getBulkFilesLoaded(conn, extent, tid);
+
+      for (FileRef file : files)
+        if (paths.keySet().remove(file))
+          log.debug("Ignoring request to re-import a file already imported: " + extent + ": " + file);
+
+      if (paths.size() > 0) {
+        long bulkTime = Long.MIN_VALUE;
+        if (setTime) {
+          for (DataFileValue dfv : paths.values()) {
+            long nextTime = tablet.getAndUpdateTime();
+            if (nextTime < bulkTime)
+              throw new IllegalStateException("Time went backwards unexpectedly " + nextTime + " " + bulkTime);
+            bulkTime = nextTime;
+            dfv.setTime(bulkTime);
+          }
+        }
+        
+        tablet.updatePersistedTime(bulkTime, paths, tid);
+      }
+    }
+
+    synchronized (tablet) {
+      for (Entry<FileRef,DataFileValue> tpath : paths.entrySet()) {
+        if (datafileSizes.containsKey(tpath.getKey())) {
+          log.error("Adding file that is already in set " + tpath.getKey());
+        }
+        datafileSizes.put(tpath.getKey(), tpath.getValue());
+
+      }
+
+      tablet.getTabletResources().importedMapFiles();
+
+      tablet.computeNumEntries();
+    }
+
+    for (Entry<FileRef,DataFileValue> entry : paths.entrySet()) {
+      log.log(TLevel.TABLET_HIST, tablet.getExtent() + " import " + entry.getKey() + " " + entry.getValue());
+    }
+  }
+
+  FileRef reserveMergingMinorCompactionFile() {
+    if (mergingMinorCompactionFile != null)
+      throw new IllegalStateException("Tried to reserve merging minor compaction file when already reserved  : " + mergingMinorCompactionFile);
+
+    if (tablet.getExtent().isRootTablet())
+      return null;
+
+    int maxFiles = tablet.getTableConfiguration().getMaxFilesPerTablet();
+
+    // when a major compaction is running and we are at max files, write out
+    // one extra file... want to avoid the case where major compaction is
+    // compacting everything except for the largest file, and therefore the
+    // largest file is returned for merging.. the following check mostly
+    // avoids this case, except for the case where major compactions fail or
+    // are canceled
+    if (majorCompactingFiles.size() > 0 && datafileSizes.size() == maxFiles)
+      return null;
+
+    if (datafileSizes.size() >= maxFiles) {
+      // find the smallest file
+
+      long min = Long.MAX_VALUE;
+      FileRef minName = null;
+
+      for (Entry<FileRef,DataFileValue> entry : datafileSizes.entrySet()) {
+        if (entry.getValue().getSize() < min && !majorCompactingFiles.contains(entry.getKey())) {
+          min = entry.getValue().getSize();
+          minName = entry.getKey();
+        }
+      }
+
+      if (minName == null)
+        return null;
+
+      mergingMinorCompactionFile = minName;
+      return minName;
+    }
+
+    return null;
+  }
+
+  void unreserveMergingMinorCompactionFile(FileRef file) {
+    if ((file == null && mergingMinorCompactionFile != null) || (file != null && mergingMinorCompactionFile == null)
+        || (file != null && mergingMinorCompactionFile != null && !file.equals(mergingMinorCompactionFile)))
+      throw new IllegalStateException("Disagreement " + file + " " + mergingMinorCompactionFile);
+
+    mergingMinorCompactionFile = null;
+  }
+
+  void bringMinorCompactionOnline(FileRef tmpDatafile, FileRef newDatafile, FileRef absMergeFile, DataFileValue dfv, CommitSession commitSession, long flushId)
+      throws IOException {
+
+    IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+    if (tablet.getExtent().isRootTablet()) {
+      try {
+        if (!zoo.isLockHeld(tablet.getTabletServer().getLock().getLockID())) {
+          throw new IllegalStateException();
+        }
+      } catch (Exception e) {
+        throw new IllegalStateException("Can not bring major compaction online, lock not held", e);
+      }
+    }
+
+    // rename before putting in metadata table, so files in metadata table should
+    // always exist
+    do {
+      try {
+        if (dfv.getNumEntries() == 0) {
+          tablet.getTabletServer().getFileSystem().deleteRecursively(tmpDatafile.path());
+        } else {
+          if (tablet.getTabletServer().getFileSystem().exists(newDatafile.path())) {
+            log.warn("Target map file already exist " + newDatafile);
+            tablet.getTabletServer().getFileSystem().deleteRecursively(newDatafile.path());
+          }
+
+          rename(tablet.getTabletServer().getFileSystem(), tmpDatafile.path(), newDatafile.path());
+        }
+        break;
+      } catch (IOException ioe) {
+        log.warn("Tablet " + tablet.getExtent() + " failed to rename " + newDatafile + " after MinC, will retry in 60 secs...", ioe);
+        UtilWaitThread.sleep(60 * 1000);
+      }
+    } while (true);
+
+    long t1, t2;
+
+    // the code below always assumes merged files are in use by scans... this must be done
+    // because the in memory list of files is not updated until after the metadata table
+    // therefore the file is available to scans until memory is updated, but want to ensure
+    // the file is not available for garbage collection... if memory were updated
+    // before this point (like major compactions do), then the following code could wait
+    // for scans to finish like major compactions do.... used to wait for scans to finish
+    // here, but that was incorrect because a scan could start after waiting but before
+    // memory was updated... assuming the file is always in use by scans leads to
+    // one uneeded metadata update when it was not actually in use
+    Set<FileRef> filesInUseByScans = Collections.emptySet();
+    if (absMergeFile != null)
+      filesInUseByScans = Collections.singleton(absMergeFile);
+
+    // very important to write delete entries outside of log lock, because
+    // this metadata write does not go up... it goes sideways or to itself
+    if (absMergeFile != null)
+      MetadataTableUtil.addDeleteEntries(tablet.getExtent(), Collections.singleton(absMergeFile), SystemCredentials.get());
+
+    Set<String> unusedWalLogs = tablet.beginClearingUnusedLogs();
+    try {
+      // the order of writing to metadata and walog is important in the face of machine/process failures
+      // need to write to metadata before writing to walog, when things are done in the reverse order
+      // data could be lost... the minor compaction start even should be written before the following metadata
+      // write is made
+
+      tablet.updateTabletDataFile(commitSession.getMaxCommittedTime(), newDatafile, absMergeFile, dfv, unusedWalLogs, filesInUseByScans, flushId);
+
+    } finally {
+      tablet.finishClearingUnusedLogs();
+    }
+
+    do {
+      try {
+        // the purpose of making this update use the new commit session, instead of the old one passed in,
+        // is because the new one will reference the logs used by current memory...
+        
+        tablet.getTabletServer().minorCompactionFinished(tablet.getTabletMemory().getCommitSession(), newDatafile.toString(), commitSession.getWALogSeq() + 2);
+        break;
+      } catch (IOException e) {
+        log.error("Failed to write to write-ahead log " + e.getMessage() + " will retry", e);
+        UtilWaitThread.sleep(1 * 1000);
+      }
+    } while (true);
+
+    synchronized (tablet) {
+      t1 = System.currentTimeMillis();
+
+      if (datafileSizes.containsKey(newDatafile)) {
+        log.error("Adding file that is already in set " + newDatafile);
+      }
+      
+      if (dfv.getNumEntries() > 0) {
+        datafileSizes.put(newDatafile, dfv);
+      }
+      
+      if (absMergeFile != null) {
+        datafileSizes.remove(absMergeFile);
+      }
+      
+      unreserveMergingMinorCompactionFile(absMergeFile);
+      
+      tablet.flushComplete(flushId);
+      
+      t2 = System.currentTimeMillis();
+    }
+
+    // must do this after list of files in memory is updated above
+    removeFilesAfterScan(filesInUseByScans);
+
+    if (absMergeFile != null)
+      log.log(TLevel.TABLET_HIST, tablet.getExtent() + " MinC [" + absMergeFile + ",memory] -> " + newDatafile);
+    else
+      log.log(TLevel.TABLET_HIST, tablet.getExtent() + " MinC [memory] -> " + newDatafile);
+    log.debug(String.format("MinC finish lock %.2f secs %s", (t2 - t1) / 1000.0, tablet.getExtent().toString()));
+    long splitSize = tablet.getTableConfiguration().getMemoryInBytes(Property.TABLE_SPLIT_THRESHOLD);
+    if (dfv.getSize() > splitSize) {
+      log.debug(String.format("Minor Compaction wrote out file larger than split threshold.  split threshold = %,d  file size = %,d", splitSize, dfv.getSize()));
+    }
+  }
+
+  public void reserveMajorCompactingFiles(Collection<FileRef> files) {
+    if (majorCompactingFiles.size() != 0)
+      throw new IllegalStateException("Major compacting files not empty " + majorCompactingFiles);
+
+    if (mergingMinorCompactionFile != null && files.contains(mergingMinorCompactionFile))
+      throw new IllegalStateException("Major compaction tried to resrve file in use by minor compaction " + mergingMinorCompactionFile);
+
+    majorCompactingFiles.addAll(files);
+  }
+
+  public void clearMajorCompactingFile() {
+    majorCompactingFiles.clear();
+  }
+
+  void bringMajorCompactionOnline(Set<FileRef> oldDatafiles, FileRef tmpDatafile, FileRef newDatafile, Long compactionId, DataFileValue dfv)
+      throws IOException {
+    final KeyExtent extent = tablet.getExtent();
+    long t1, t2;
+
+    if (!extent.isRootTablet()) {
+
+      if (tablet.getTabletServer().getFileSystem().exists(newDatafile.path())) {
+        log.error("Target map file already exist " + newDatafile, new Exception());
+        throw new IllegalStateException("Target map file already exist " + newDatafile);
+      }
+
+      // rename before putting in metadata table, so files in metadata table should
+      // always exist
+      rename(tablet.getTabletServer().getFileSystem(), tmpDatafile.path(), newDatafile.path());
+
+      if (dfv.getNumEntries() == 0) {
+        tablet.getTabletServer().getFileSystem().deleteRecursively(newDatafile.path());
+      }
+    }
+
+    TServerInstance lastLocation = null;
+    synchronized (tablet) {
+
+      t1 = System.currentTimeMillis();
+
+      IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+
+      tablet.incrementDataSourceDeletions();
+
+      if (extent.isRootTablet()) {
+
+        waitForScansToFinish(oldDatafiles, true, Long.MAX_VALUE);
+
+        try {
+          if (!zoo.isLockHeld(tablet.getTabletServer().getLock().getLockID())) {
+            throw new IllegalStateException();
+          }
+        } catch (Exception e) {
+          throw new IllegalStateException("Can not bring major compaction online, lock not held", e);
+        }
+
+        // mark files as ready for deletion, but
+        // do not delete them until we successfully
+        // rename the compacted map file, in case
+        // the system goes down
+
+        RootFiles.replaceFiles(tablet.getTableConfiguration(), tablet.getTabletServer().getFileSystem(), tablet.getLocation(), oldDatafiles, tmpDatafile, newDatafile);
+      }
+
+      // atomically remove old files and add new file
+      for (FileRef oldDatafile : oldDatafiles) {
+        if (!datafileSizes.containsKey(oldDatafile)) {
+          log.error("file does not exist in set " + oldDatafile);
+        }
+        datafileSizes.remove(oldDatafile);
+        majorCompactingFiles.remove(oldDatafile);
+      }
+
+      if (datafileSizes.containsKey(newDatafile)) {
+        log.error("Adding file that is already in set " + newDatafile);
+      }
+
+      if (dfv.getNumEntries() > 0) {
+        datafileSizes.put(newDatafile, dfv);
+      }
+
+      // could be used by a follow on compaction in a multipass compaction
+      majorCompactingFiles.add(newDatafile);
+
+      tablet.computeNumEntries();
+
+      lastLocation = tablet.resetLastLocation();
+
+      tablet.setLastCompactionID(compactionId);
+      t2 = System.currentTimeMillis();
+    }
+
+    if (!extent.isRootTablet()) {
+      Set<FileRef> filesInUseByScans = waitForScansToFinish(oldDatafiles, false, 10000);
+      if (filesInUseByScans.size() > 0)
+        log.debug("Adding scan refs to metadata " + extent + " " + filesInUseByScans);
+      MasterMetadataUtil.replaceDatafiles(extent, oldDatafiles, filesInUseByScans, newDatafile, compactionId, dfv, SystemCredentials.get(),
+          tablet.getTabletServer().getClientAddressString(), lastLocation, tablet.getTabletServer().getLock());
+      removeFilesAfterScan(filesInUseByScans);
+    }
+
+    log.debug(String.format("MajC finish lock %.2f secs", (t2 - t1) / 1000.0));
+    log.log(TLevel.TABLET_HIST, extent + " MajC " + oldDatafiles + " --> " + newDatafile);
+  }
+
+  public SortedMap<FileRef,DataFileValue> getDatafileSizes() {
+    synchronized (tablet) {
+      TreeMap<FileRef,DataFileValue> copy = new TreeMap<FileRef,DataFileValue>(datafileSizes);
+      return Collections.unmodifiableSortedMap(copy);
+    }
+  }
+
+  public Set<FileRef> getFiles() {
+    synchronized (tablet) {
+      HashSet<FileRef> files = new HashSet<FileRef>(datafileSizes.keySet());
+      return Collections.unmodifiableSet(files);
+    }
+  }
+  
+  public int getNumFiles() {
+    return datafileSizes.size();
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7db2abf1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/KVEntry.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/KVEntry.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/KVEntry.java
new file mode 100644
index 0000000..4919be9
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/KVEntry.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.tablet;
+
+import java.util.Arrays;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyValue;
+import org.apache.accumulo.core.data.Value;
+
+public class KVEntry extends KeyValue {
+  private static final long serialVersionUID = 1L;
+
+  public KVEntry(Key k, Value v) {
+    super(new Key(k), Arrays.copyOf(v.get(), v.get().length));
+  }
+
+  int numBytes() {
+    return getKey().getSize() + getValue().get().length;
+  }
+
+  int estimateMemoryUsed() {
+    return getKey().getSize() + getValue().get().length + (9 * 32); // overhead is 32 per object
+  }
+}
\ No newline at end of file


Mime
View raw message