accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctubb...@apache.org
Subject [29/53] [abbrv] ACCUMULO-658 consistent package names to avoid overlapped sealed jars
Date Fri, 06 Sep 2013 01:48:57 GMT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/fc9363a0/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
new file mode 100644
index 0000000..17be615
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
@@ -0,0 +1,3842 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.impl.ScannerImpl;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.ConfigurationObserver;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.constraints.Violations;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Column;
+import org.apache.accumulo.core.data.ColumnUpdate;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.KeyValue;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.data.thrift.IterInfo;
+import org.apache.accumulo.core.data.thrift.MapFileInfo;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.iterators.IterationInterruptedException;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.IteratorUtil;
+import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator;
+import org.apache.accumulo.core.iterators.system.ColumnQualifierFilter;
+import org.apache.accumulo.core.iterators.system.DeletingIterator;
+import org.apache.accumulo.core.iterators.system.InterruptibleIterator;
+import org.apache.accumulo.core.iterators.system.MultiIterator;
+import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator;
+import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator.DataSource;
+import org.apache.accumulo.core.iterators.system.StatsIterator;
+import org.apache.accumulo.core.iterators.system.VisibilityFilter;
+import org.apache.accumulo.core.master.thrift.TabletLoadState;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.core.util.LocalityGroupUtil;
+import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.conf.TableConfiguration;
+import org.apache.accumulo.server.fs.FileRef;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.master.tableOps.CompactionIterators;
+import org.apache.accumulo.server.problems.ProblemReport;
+import org.apache.accumulo.server.problems.ProblemReports;
+import org.apache.accumulo.server.problems.ProblemType;
+import org.apache.accumulo.server.security.SystemCredentials;
+import org.apache.accumulo.server.tablets.TabletTime;
+import org.apache.accumulo.server.tablets.UniqueNameAllocator;
+import org.apache.accumulo.server.util.FileUtil;
+import org.apache.accumulo.server.util.MapCounter;
+import org.apache.accumulo.server.util.MasterMetadataUtil;
+import org.apache.accumulo.server.util.MetadataTableUtil;
+import org.apache.accumulo.server.util.MetadataTableUtil.LogEntry;
+import org.apache.accumulo.server.util.TabletOperations;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
+import org.apache.accumulo.trace.instrument.Span;
+import org.apache.accumulo.trace.instrument.Trace;
+import org.apache.accumulo.tserver.Compactor.CompactionCanceledException;
+import org.apache.accumulo.tserver.Compactor.CompactionEnv;
+import org.apache.accumulo.tserver.FileManager.ScanFileManager;
+import org.apache.accumulo.tserver.InMemoryMap.MemoryIterator;
+import org.apache.accumulo.tserver.TabletServer.TservConstraintEnv;
+import org.apache.accumulo.tserver.TabletServerResourceManager.TabletResourceManager;
+import org.apache.accumulo.tserver.TabletStatsKeeper.Operation;
+import org.apache.accumulo.tserver.constraints.ConstraintChecker;
+import org.apache.accumulo.tserver.log.DfsLogger;
+import org.apache.accumulo.tserver.log.MutationReceiver;
+import org.apache.accumulo.tserver.mastermessage.TabletStatusMessage;
+import org.apache.accumulo.tserver.metrics.TabletServerMinCMetrics;
+import org.apache.commons.codec.DecoderException;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+
+/*
+ * We need to be able to have the master tell a tabletServer to
+ * close this file, and the tablet server to handle all pending client reads
+ * before closing
+ * 
+ */
+
+/**
+ * 
+ * this class just provides an interface to read from a MapFile mostly takes care of reporting start and end keys
+ * 
+ * need this because a single row extent can have multiple columns this manages all the columns (each handled by a store) for a single row-extent
+ * 
+ * 
+ */
+
+public class Tablet {
+  
+  enum MajorCompactionReason {
+    // do not change the order, the order of this enum determines the order
+    // in which queued major compactions are executed
+    USER,
+    CHOP,
+    NORMAL,
+    IDLE
+  }
+  
+  enum MinorCompactionReason {
+    USER, SYSTEM, CLOSE
+  }
+  
+  public class CommitSession {
+    
+    private int seq;
+    private InMemoryMap memTable;
+    private int commitsInProgress;
+    private long maxCommittedTime = Long.MIN_VALUE;
+    
+    private CommitSession(int seq, InMemoryMap imm) {
+      this.seq = seq;
+      this.memTable = imm;
+      commitsInProgress = 0;
+    }
+    
+    public int getWALogSeq() {
+      return seq;
+    }
+    
+    private void decrementCommitsInProgress() {
+      if (commitsInProgress < 1)
+        throw new IllegalStateException("commitsInProgress = " + commitsInProgress);
+      
+      commitsInProgress--;
+      if (commitsInProgress == 0)
+        Tablet.this.notifyAll();
+    }
+    
+    private void incrementCommitsInProgress() {
+      if (commitsInProgress < 0)
+        throw new IllegalStateException("commitsInProgress = " + commitsInProgress);
+      
+      commitsInProgress++;
+    }
+    
+    private void waitForCommitsToFinish() {
+      while (commitsInProgress > 0) {
+        try {
+          Tablet.this.wait(50);
+        } catch (InterruptedException e) {
+          log.warn(e, e);
+        }
+      }
+    }
+    
+    public void abortCommit(List<Mutation> value) {
+      Tablet.this.abortCommit(this, value);
+    }
+    
+    public void commit(List<Mutation> mutations) {
+      Tablet.this.commit(this, mutations);
+    }
+    
+    public Tablet getTablet() {
+      return Tablet.this;
+    }
+    
+    public boolean beginUpdatingLogsUsed(ArrayList<DfsLogger> copy, boolean mincFinish) {
+      return Tablet.this.beginUpdatingLogsUsed(memTable, copy, mincFinish);
+    }
+    
+    public void finishUpdatingLogsUsed() {
+      Tablet.this.finishUpdatingLogsUsed();
+    }
+    
+    public int getLogId() {
+      return logId;
+    }
+    
+    public KeyExtent getExtent() {
+      return extent;
+    }
+    
+    private void updateMaxCommittedTime(long time) {
+      maxCommittedTime = Math.max(time, maxCommittedTime);
+    }
+    
+    private long getMaxCommittedTime() {
+      if (maxCommittedTime == Long.MIN_VALUE)
+        throw new IllegalStateException("Tried to read max committed time when it was never set");
+      return maxCommittedTime;
+    }
+    
+  }
+  
+  private class TabletMemory {
+    private InMemoryMap memTable;
+    private InMemoryMap otherMemTable;
+    private InMemoryMap deletingMemTable;
+    private int nextSeq = 1;
+    private CommitSession commitSession;
+    
+    TabletMemory() {
+      try {
+        memTable = new InMemoryMap(acuTableConf);
+      } catch (LocalityGroupConfigurationError e) {
+        throw new RuntimeException(e);
+      }
+      commitSession = new CommitSession(nextSeq, memTable);
+      nextSeq += 2;
+    }
+    
+    InMemoryMap getMemTable() {
+      return memTable;
+    }
+    
+    InMemoryMap getMinCMemTable() {
+      return otherMemTable;
+    }
+    
+    CommitSession prepareForMinC() {
+      if (otherMemTable != null) {
+        throw new IllegalStateException();
+      }
+      
+      if (deletingMemTable != null) {
+        throw new IllegalStateException();
+      }
+      
+      otherMemTable = memTable;
+      try {
+        memTable = new InMemoryMap(acuTableConf);
+      } catch (LocalityGroupConfigurationError e) {
+        throw new RuntimeException(e);
+      }
+      
+      CommitSession oldCommitSession = commitSession;
+      commitSession = new CommitSession(nextSeq, memTable);
+      nextSeq += 2;
+      
+      tabletResources.updateMemoryUsageStats(memTable.estimatedSizeInBytes(), otherMemTable.estimatedSizeInBytes());
+      
+      return oldCommitSession;
+    }
+    
+    void finishedMinC() {
+      
+      if (otherMemTable == null) {
+        throw new IllegalStateException();
+      }
+      
+      if (deletingMemTable != null) {
+        throw new IllegalStateException();
+      }
+      
+      deletingMemTable = otherMemTable;
+      
+      otherMemTable = null;
+      Tablet.this.notifyAll();
+    }
+    
+    void finalizeMinC() {
+      try {
+        deletingMemTable.delete(15000);
+      } finally {
+        synchronized (Tablet.this) {
+          if (otherMemTable != null) {
+            throw new IllegalStateException();
+          }
+          
+          if (deletingMemTable == null) {
+            throw new IllegalStateException();
+          }
+          
+          deletingMemTable = null;
+          
+          tabletResources.updateMemoryUsageStats(memTable.estimatedSizeInBytes(), 0);
+        }
+      }
+    }
+    
+    boolean memoryReservedForMinC() {
+      return otherMemTable != null || deletingMemTable != null;
+    }
+    
+    void waitForMinC() {
+      while (otherMemTable != null || deletingMemTable != null) {
+        try {
+          Tablet.this.wait(50);
+        } catch (InterruptedException e) {
+          log.warn(e, e);
+        }
+      }
+    }
+    
+    void mutate(CommitSession cm, List<Mutation> mutations) {
+      cm.memTable.mutate(mutations);
+    }
+    
+    void updateMemoryUsageStats() {
+      long other = 0;
+      if (otherMemTable != null)
+        other = otherMemTable.estimatedSizeInBytes();
+      else if (deletingMemTable != null)
+        other = deletingMemTable.estimatedSizeInBytes();
+      
+      tabletResources.updateMemoryUsageStats(memTable.estimatedSizeInBytes(), other);
+    }
+    
+    List<MemoryIterator> getIterators() {
+      List<MemoryIterator> toReturn = new ArrayList<MemoryIterator>(2);
+      toReturn.add(memTable.skvIterator());
+      if (otherMemTable != null)
+        toReturn.add(otherMemTable.skvIterator());
+      return toReturn;
+    }
+    
+    void returnIterators(List<MemoryIterator> iters) {
+      for (MemoryIterator iter : iters) {
+        iter.close();
+      }
+    }
+    
+    public long getNumEntries() {
+      if (otherMemTable != null)
+        return memTable.getNumEntries() + otherMemTable.getNumEntries();
+      return memTable.getNumEntries();
+    }
+    
+    CommitSession getCommitSession() {
+      return commitSession;
+    }
+  }
+  
+  private TabletMemory tabletMemory;
+  
+  private final TabletTime tabletTime;
+  private long persistedTime;
+  private final Object timeLock = new Object();
+  
+  private Path location; // absolute path of this tablets dir
+  private TServerInstance lastLocation;
+  
+  private Configuration conf;
+  private VolumeManager fs;
+  
+  private TableConfiguration acuTableConf;
+  
+  private volatile boolean tableDirChecked = false;
+  
+  private AtomicLong dataSourceDeletions = new AtomicLong(0);
+  private Set<ScanDataSource> activeScans = new HashSet<ScanDataSource>();
+  
+  private volatile boolean closing = false;
+  private boolean closed = false;
+  private boolean closeComplete = false;
+  
+  private long lastFlushID = -1;
+  private long lastCompactID = -1;
+  
+  private KeyExtent extent;
+  
+  private TabletResourceManager tabletResources;
+  final private DatafileManager datafileManager;
+  private volatile boolean majorCompactionInProgress = false;
+  private volatile boolean majorCompactionWaitingToStart = false;
+  private Set<MajorCompactionReason> majorCompactionQueued = Collections.synchronizedSet(EnumSet.noneOf(MajorCompactionReason.class));
+  private volatile boolean minorCompactionInProgress = false;
+  private volatile boolean minorCompactionWaitingToStart = false;
+  
+  private boolean updatingFlushID = false;
+  
+  private AtomicReference<ConstraintChecker> constraintChecker = new AtomicReference<ConstraintChecker>();
+  
+  private String tabletDirectory;
+  
+  private int writesInProgress = 0;
+  
+  private static final Logger log = Logger.getLogger(Tablet.class);
+  public TabletStatsKeeper timer;
+  
+  private Rate queryRate = new Rate(0.2);
+  private long queryCount = 0;
+  
+  private Rate queryByteRate = new Rate(0.2);
+  private long queryBytes = 0;
+  
+  private Rate ingestRate = new Rate(0.2);
+  private long ingestCount = 0;
+  
+  private Rate ingestByteRate = new Rate(0.2);
+  private long ingestBytes = 0;
+  
+  private byte[] defaultSecurityLabel = new byte[0];
+  
+  private long lastMinorCompactionFinishTime;
+  private long lastMapFileImportTime;
+  
+  private volatile long numEntries;
+  private volatile long numEntriesInMemory;
+  
+  // a count of the amount of data read by the iterators
+  private AtomicLong scannedCount = new AtomicLong(0);
+  private Rate scannedRate = new Rate(0.2);
+  
+  private ConfigurationObserver configObserver;
+  
+  private TabletServer tabletServer;
+  
+  private final int logId;
+  // ensure we only have one reader/writer of our bulk file notes at at time
+  public final Object bulkFileImportLock = new Object();
+  
+  public int getLogId() {
+    return logId;
+  }
+  
+  public static class TabletClosedException extends RuntimeException {
+    public TabletClosedException(Exception e) {
+      super(e);
+    }
+    
+    public TabletClosedException() {
+      super();
+    }
+    
+    private static final long serialVersionUID = 1L;
+  }
+  
+  FileRef getNextMapFilename(String prefix) throws IOException {
+    String extension = FileOperations.getNewFileExtension(tabletServer.getTableConfiguration(extent));
+    checkTabletDir();
+    return new FileRef(location.toString() + "/" + prefix + UniqueNameAllocator.getInstance().getNextName() + "." + extension);
+  }
+  
+  private void checkTabletDir() throws IOException {
+    if (!tableDirChecked) {
+      checkTabletDir(this.location);
+      tableDirChecked = true;
+    }
+  }
+  
+  private void checkTabletDir(Path tabletDir) throws IOException {
+    
+    FileStatus[] files = null;
+    try {
+      files = fs.listStatus(tabletDir);
+    } catch (FileNotFoundException ex) {
+      // ignored
+    }
+    
+    if (files == null) {
+      if (tabletDir.getName().startsWith("c-"))
+        log.debug("Tablet " + extent + " had no dir, creating " + tabletDir); // its a clone dir...
+      else
+        log.warn("Tablet " + extent + " had no dir, creating " + tabletDir);
+      
+      fs.mkdirs(tabletDir);
+    }
+  }
+  
+  class DatafileManager {
+    // access to datafilesizes needs to be synchronized: see CompactionRunner#getNumFiles
+    final private Map<FileRef,DataFileValue> datafileSizes = Collections.synchronizedMap(new TreeMap<FileRef,DataFileValue>());
+    
+    DatafileManager(SortedMap<FileRef,DataFileValue> datafileSizes) {
+      for (Entry<FileRef,DataFileValue> datafiles : datafileSizes.entrySet())
+        this.datafileSizes.put(datafiles.getKey(), datafiles.getValue());
+    }
+    
+    FileRef mergingMinorCompactionFile = null;
+    Set<FileRef> filesToDeleteAfterScan = new HashSet<FileRef>();
+    Map<Long,Set<FileRef>> scanFileReservations = new HashMap<Long,Set<FileRef>>();
+    MapCounter<FileRef> fileScanReferenceCounts = new MapCounter<FileRef>();
+    long nextScanReservationId = 0;
+    boolean reservationsBlocked = false;
+    
+    Set<FileRef> majorCompactingFiles = new HashSet<FileRef>();
+    
+    Pair<Long,Map<FileRef,DataFileValue>> reserveFilesForScan() {
+      synchronized (Tablet.this) {
+        
+        while (reservationsBlocked) {
+          try {
+            Tablet.this.wait(50);
+          } catch (InterruptedException e) {
+            log.warn(e, e);
+          }
+        }
+        
+        Set<FileRef> absFilePaths = new HashSet<FileRef>(datafileSizes.keySet());
+        
+        long rid = nextScanReservationId++;
+        
+        scanFileReservations.put(rid, absFilePaths);
+        
+        Map<FileRef,DataFileValue> ret = new HashMap<FileRef,DataFileValue>();
+        
+        for (FileRef path : absFilePaths) {
+          fileScanReferenceCounts.increment(path, 1);
+          ret.put(path, datafileSizes.get(path));
+        }
+        
+        return new Pair<Long,Map<FileRef,DataFileValue>>(rid, ret);
+      }
+    }
+    
+    void returnFilesForScan(Long reservationId) {
+      
+      final Set<FileRef> filesToDelete = new HashSet<FileRef>();
+      
+      synchronized (Tablet.this) {
+        Set<FileRef> absFilePaths = scanFileReservations.remove(reservationId);
+        
+        if (absFilePaths == null)
+          throw new IllegalArgumentException("Unknown scan reservation id " + reservationId);
+        
+        boolean notify = false;
+        for (FileRef path : absFilePaths) {
+          long refCount = fileScanReferenceCounts.decrement(path, 1);
+          if (refCount == 0) {
+            if (filesToDeleteAfterScan.remove(path))
+              filesToDelete.add(path);
+            notify = true;
+          } else if (refCount < 0)
+            throw new IllegalStateException("Scan ref count for " + path + " is " + refCount);
+        }
+        
+        if (notify)
+          Tablet.this.notifyAll();
+      }
+      
+      if (filesToDelete.size() > 0) {
+        log.debug("Removing scan refs from metadata " + extent + " " + filesToDelete);
+        MetadataTableUtil.removeScanFiles(extent, filesToDelete, SystemCredentials.get(), tabletServer.getLock());
+      }
+    }
+    
+    private void removeFilesAfterScan(Set<FileRef> scanFiles) {
+      if (scanFiles.size() == 0)
+        return;
+      
+      Set<FileRef> filesToDelete = new HashSet<FileRef>();
+      
+      synchronized (Tablet.this) {
+        for (FileRef path : scanFiles) {
+          if (fileScanReferenceCounts.get(path) == 0)
+            filesToDelete.add(path);
+          else
+            filesToDeleteAfterScan.add(path);
+        }
+      }
+      
+      if (filesToDelete.size() > 0) {
+        log.debug("Removing scan refs from metadata " + extent + " " + filesToDelete);
+        MetadataTableUtil.removeScanFiles(extent, filesToDelete, SystemCredentials.get(), tabletServer.getLock());
+      }
+    }
+    
+    private TreeSet<FileRef> waitForScansToFinish(Set<FileRef> pathsToWaitFor, boolean blockNewScans, long maxWaitTime) {
+      long startTime = System.currentTimeMillis();
+      TreeSet<FileRef> inUse = new TreeSet<FileRef>();
+      
+      Span waitForScans = Trace.start("waitForScans");
+      try {
+        synchronized (Tablet.this) {
+          if (blockNewScans) {
+            if (reservationsBlocked)
+              throw new IllegalStateException();
+            
+            reservationsBlocked = true;
+          }
+          
+          for (FileRef path : pathsToWaitFor) {
+            while (fileScanReferenceCounts.get(path) > 0 && System.currentTimeMillis() - startTime < maxWaitTime) {
+              try {
+                Tablet.this.wait(100);
+              } catch (InterruptedException e) {
+                log.warn(e, e);
+              }
+            }
+          }
+          
+          for (FileRef path : pathsToWaitFor) {
+            if (fileScanReferenceCounts.get(path) > 0)
+              inUse.add(path);
+          }
+          
+          if (blockNewScans) {
+            reservationsBlocked = false;
+            Tablet.this.notifyAll();
+          }
+          
+        }
+      } finally {
+        waitForScans.stop();
+      }
+      return inUse;
+    }
+    
+    public void importMapFiles(long tid, Map<FileRef,DataFileValue> pathsString, boolean setTime) throws IOException {
+      
+      String bulkDir = null;
+      
+      Map<FileRef,DataFileValue> paths = new HashMap<FileRef,DataFileValue>();
+      for (Entry<FileRef,DataFileValue> entry : pathsString.entrySet())
+        paths.put(entry.getKey(), entry.getValue());
+      
+      for (FileRef tpath : paths.keySet()) {
+        
+        boolean inTheRightDirectory = false;
+        Path parent = tpath.path().getParent().getParent();
+        for (String tablesDir : ServerConstants.getTablesDirs()) {
+          if (parent.equals(new Path(tablesDir, extent.getTableId().toString()))) {
+            inTheRightDirectory = true;
+            break;
+          }
+        }
+        if (!inTheRightDirectory) {
+          throw new IOException("Data file " + tpath + " not in table dirs");
+        }
+        
+        if (bulkDir == null)
+          bulkDir = tpath.path().getParent().toString();
+        else if (!bulkDir.equals(tpath.path().getParent().toString()))
+          throw new IllegalArgumentException("bulk files in different dirs " + bulkDir + " " + tpath);
+        
+      }
+      
+      if (extent.isRootTablet()) {
+        throw new IllegalArgumentException("Can not import files to root tablet");
+      }
+      
+      synchronized (bulkFileImportLock) {
+        Credentials creds = SystemCredentials.get();
+        Connector conn;
+        try {
+          conn = HdfsZooInstance.getInstance().getConnector(creds.getPrincipal(), creds.getToken());
+        } catch (Exception ex) {
+          throw new IOException(ex);
+        }
+        // Remove any bulk files we've previously loaded and compacted away
+        List<FileRef> files = MetadataTableUtil.getBulkFilesLoaded(conn, extent, tid);
+        
+        for (FileRef file : files)
+          if (paths.keySet().remove(file.path()))
+            log.debug("Ignoring request to re-import a file already imported: " + extent + ": " + file);
+        
+        if (paths.size() > 0) {
+          long bulkTime = Long.MIN_VALUE;
+          if (setTime) {
+            for (DataFileValue dfv : paths.values()) {
+              long nextTime = tabletTime.getAndUpdateTime();
+              if (nextTime < bulkTime)
+                throw new IllegalStateException("Time went backwards unexpectedly " + nextTime + " " + bulkTime);
+              bulkTime = nextTime;
+              dfv.setTime(bulkTime);
+            }
+          }
+          
+          synchronized (timeLock) {
+            if (bulkTime > persistedTime)
+              persistedTime = bulkTime;
+            
+            MetadataTableUtil.updateTabletDataFile(tid, extent, paths, tabletTime.getMetadataValue(persistedTime), creds, tabletServer.getLock());
+          }
+        }
+      }
+      
+      synchronized (Tablet.this) {
+        for (Entry<FileRef,DataFileValue> tpath : paths.entrySet()) {
+          if (datafileSizes.containsKey(tpath.getKey())) {
+            log.error("Adding file that is already in set " + tpath.getKey());
+          }
+          datafileSizes.put(tpath.getKey(), tpath.getValue());
+          
+        }
+        
+        tabletResources.importedMapFiles();
+        
+        computeNumEntries();
+      }
+      
+      for (FileRef tpath : paths.keySet()) {
+        log.log(TLevel.TABLET_HIST, extent + " import " + tpath + " " + paths.get(tpath));
+      }
+    }
+    
+    FileRef reserveMergingMinorCompactionFile() {
+      if (mergingMinorCompactionFile != null)
+        throw new IllegalStateException("Tried to reserve merging minor compaction file when already reserved  : " + mergingMinorCompactionFile);
+      
+      if (extent.isRootTablet())
+        return null;
+      
+      int maxFiles = acuTableConf.getMaxFilesPerTablet();
+      
+      // when a major compaction is running and we are at max files, write out
+      // one extra file... want to avoid the case where major compaction is
+      // compacting everything except for the largest file, and therefore the
+      // largest file is returned for merging.. the following check mostly
+      // avoids this case, except for the case where major compactions fail or
+      // are canceled
+      if (majorCompactingFiles.size() > 0 && datafileSizes.size() == maxFiles)
+        return null;
+      
+      if (datafileSizes.size() >= maxFiles) {
+        // find the smallest file
+        
+        long min = Long.MAX_VALUE;
+        FileRef minName = null;
+        
+        for (Entry<FileRef,DataFileValue> entry : datafileSizes.entrySet()) {
+          if (entry.getValue().getSize() < min && !majorCompactingFiles.contains(entry.getKey())) {
+            min = entry.getValue().getSize();
+            minName = entry.getKey();
+          }
+        }
+        
+        if (minName == null)
+          return null;
+        
+        mergingMinorCompactionFile = minName;
+        return minName;
+      }
+      
+      return null;
+    }
+    
+    void unreserveMergingMinorCompactionFile(FileRef file) {
+      if ((file == null && mergingMinorCompactionFile != null) || (file != null && mergingMinorCompactionFile == null)
+          || (file != null && mergingMinorCompactionFile != null && !file.equals(mergingMinorCompactionFile)))
+        throw new IllegalStateException("Disagreement " + file + " " + mergingMinorCompactionFile);
+      
+      mergingMinorCompactionFile = null;
+    }
+    
+    void bringMinorCompactionOnline(FileRef tmpDatafile, FileRef newDatafile, FileRef absMergeFile, DataFileValue dfv, CommitSession commitSession, long flushId)
+        throws IOException {
+      
+      IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+      if (extent.isRootTablet()) {
+        try {
+          if (!zoo.isLockHeld(tabletServer.getLock().getLockID())) {
+            throw new IllegalStateException();
+          }
+        } catch (Exception e) {
+          throw new IllegalStateException("Can not bring major compaction online, lock not held", e);
+        }
+      }
+      
+      // rename before putting in metadata table, so files in metadata table should
+      // always exist
+      do {
+        try {
+          if (dfv.getNumEntries() == 0) {
+            fs.deleteRecursively(tmpDatafile.path());
+          } else {
+            if (fs.exists(newDatafile.path())) {
+              log.warn("Target map file already exist " + newDatafile);
+              fs.deleteRecursively(newDatafile.path());
+            }
+            
+            if (!fs.rename(tmpDatafile.path(), newDatafile.path())) {
+              throw new IOException("rename fails");
+            }
+          }
+          break;
+        } catch (IOException ioe) {
+          log.warn("Tablet " + extent + " failed to rename " + newDatafile + " after MinC, will retry in 60 secs...", ioe);
+          UtilWaitThread.sleep(60 * 1000);
+        }
+      } while (true);
+      
+      long t1, t2;
+      
+      // the code below always assumes merged files are in use by scans... this must be done
+      // because the in memory list of files is not updated until after the metadata table
+      // therefore the file is available to scans until memory is updated, but want to ensure
+      // the file is not available for garbage collection... if memory were updated
+      // before this point (like major compactions do), then the following code could wait
+      // for scans to finish like major compactions do.... used to wait for scans to finish
+      // here, but that was incorrect because a scan could start after waiting but before
+      // memory was updated... assuming the file is always in use by scans leads to
+      // one uneeded metadata update when it was not actually in use
+      Set<FileRef> filesInUseByScans = Collections.emptySet();
+      if (absMergeFile != null)
+        filesInUseByScans = Collections.singleton(absMergeFile);
+      
+      // very important to write delete entries outside of log lock, because
+      // this !METADATA write does not go up... it goes sideways or to itself
+      if (absMergeFile != null)
+        MetadataTableUtil.addDeleteEntries(extent, Collections.singleton(absMergeFile), SystemCredentials.get());
+      
+      Set<String> unusedWalLogs = beginClearingUnusedLogs();
+      try {
+        // the order of writing to !METADATA and walog is important in the face of machine/process failures
+        // need to write to !METADATA before writing to walog, when things are done in the reverse order
+        // data could be lost... the minor compaction start even should be written before the following metadata
+        // write is made
+        
+        synchronized (timeLock) {
+          if (commitSession.getMaxCommittedTime() > persistedTime)
+            persistedTime = commitSession.getMaxCommittedTime();
+          
+          String time = tabletTime.getMetadataValue(persistedTime);
+          MasterMetadataUtil.updateTabletDataFile(extent, newDatafile, absMergeFile, dfv, time, SystemCredentials.get(), filesInUseByScans,
+              tabletServer.getClientAddressString(), tabletServer.getLock(), unusedWalLogs, lastLocation, flushId);
+        }
+        
+      } finally {
+        finishClearingUnusedLogs();
+      }
+      
+      do {
+        try {
+          // the purpose of making this update use the new commit session, instead of the old one passed in,
+          // is because the new one will reference the logs used by current memory...
+          
+          tabletServer.minorCompactionFinished(tabletMemory.getCommitSession(), newDatafile.toString(), commitSession.getWALogSeq() + 2);
+          break;
+        } catch (IOException e) {
+          log.error("Failed to write to write-ahead log " + e.getMessage() + " will retry", e);
+          UtilWaitThread.sleep(1 * 1000);
+        }
+      } while (true);
+      
+      synchronized (Tablet.this) {
+        lastLocation = null;
+        
+        t1 = System.currentTimeMillis();
+        if (datafileSizes.containsKey(newDatafile)) {
+          log.error("Adding file that is already in set " + newDatafile);
+        }
+        
+        if (dfv.getNumEntries() > 0) {
+          datafileSizes.put(newDatafile, dfv);
+        }
+        
+        if (absMergeFile != null) {
+          datafileSizes.remove(absMergeFile);
+        }
+        
+        unreserveMergingMinorCompactionFile(absMergeFile);
+        
+        dataSourceDeletions.incrementAndGet();
+        tabletMemory.finishedMinC();
+        
+        lastFlushID = flushId;
+        
+        computeNumEntries();
+        t2 = System.currentTimeMillis();
+      }
+      
+      // must do this after list of files in memory is updated above
+      removeFilesAfterScan(filesInUseByScans);
+      
+      if (absMergeFile != null)
+        log.log(TLevel.TABLET_HIST, extent + " MinC [" + absMergeFile + ",memory] -> " + newDatafile);
+      else
+        log.log(TLevel.TABLET_HIST, extent + " MinC [memory] -> " + newDatafile);
+      log.debug(String.format("MinC finish lock %.2f secs %s", (t2 - t1) / 1000.0, getExtent().toString()));
+      if (dfv.getSize() > acuTableConf.getMemoryInBytes(Property.TABLE_SPLIT_THRESHOLD)) {
+        log.debug(String.format("Minor Compaction wrote out file larger than split threshold.  split threshold = %,d  file size = %,d",
+            acuTableConf.getMemoryInBytes(Property.TABLE_SPLIT_THRESHOLD), dfv.getSize()));
+      }
+      
+    }
+    
+    public void reserveMajorCompactingFiles(Set<FileRef> files) {
+      if (majorCompactingFiles.size() != 0)
+        throw new IllegalStateException("Major compacting files not empty " + majorCompactingFiles);
+      
+      if (mergingMinorCompactionFile != null && files.contains(mergingMinorCompactionFile))
+        throw new IllegalStateException("Major compaction tried to resrve file in use by minor compaction " + mergingMinorCompactionFile);
+      
+      majorCompactingFiles.addAll(files);
+    }
+    
+    public void clearMajorCompactingFile() {
+      majorCompactingFiles.clear();
+    }
+    
+    void bringMajorCompactionOnline(Set<FileRef> oldDatafiles, FileRef tmpDatafile, FileRef newDatafile, Long compactionId, DataFileValue dfv)
+        throws IOException {
+      long t1, t2;
+      
+      if (!extent.isRootTablet()) {
+        
+        if (fs.exists(newDatafile.path())) {
+          log.error("Target map file already exist " + newDatafile, new Exception());
+          throw new IllegalStateException("Target map file already exist " + newDatafile);
+        }
+        
+        // rename before putting in metadata table, so files in metadata table should
+        // always exist
+        if (!fs.rename(tmpDatafile.path(), newDatafile.path()))
+          log.warn("Rename of " + tmpDatafile + " to " + newDatafile + " returned false");
+        
+        if (dfv.getNumEntries() == 0) {
+          fs.deleteRecursively(newDatafile.path());
+        }
+      }
+      
+      TServerInstance lastLocation = null;
+      synchronized (Tablet.this) {
+        
+        t1 = System.currentTimeMillis();
+        
+        IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
+        
+        dataSourceDeletions.incrementAndGet();
+        
+        if (extent.isRootTablet()) {
+          
+          waitForScansToFinish(oldDatafiles, true, Long.MAX_VALUE);
+          
+          try {
+            if (!zoo.isLockHeld(tabletServer.getLock().getLockID())) {
+              throw new IllegalStateException();
+            }
+          } catch (Exception e) {
+            throw new IllegalStateException("Can not bring major compaction online, lock not held", e);
+          }
+          
+          // mark files as ready for deletion, but
+          // do not delete them until we successfully
+          // rename the compacted map file, in case
+          // the system goes down
+          
+          String compactName = newDatafile.path().getName();
+          
+          for (FileRef ref : oldDatafiles) {
+            Path path = ref.path();
+            fs.rename(path, new Path(location + "/delete+" + compactName + "+" + path.getName()));
+          }
+          
+          if (fs.exists(newDatafile.path())) {
+            log.error("Target map file already exist " + newDatafile, new Exception());
+            throw new IllegalStateException("Target map file already exist " + newDatafile);
+          }
+          
+          if (!fs.rename(tmpDatafile.path(), newDatafile.path()))
+            log.warn("Rename of " + tmpDatafile + " to " + newDatafile + " returned false");
+          
+          // start deleting files, if we do not finish they will be cleaned
+          // up later
+          for (FileRef ref : oldDatafiles) {
+            Path path = ref.path();
+            Path deleteFile = new Path(location + "/delete+" + compactName + "+" + path.getName());
+            if (!fs.moveToTrash(deleteFile))
+              fs.deleteRecursively(deleteFile);
+          }
+        }
+        
+        // atomically remove old files and add new file
+        for (FileRef oldDatafile : oldDatafiles) {
+          if (!datafileSizes.containsKey(oldDatafile)) {
+            log.error("file does not exist in set " + oldDatafile);
+          }
+          datafileSizes.remove(oldDatafile);
+          majorCompactingFiles.remove(oldDatafile);
+        }
+        
+        if (datafileSizes.containsKey(newDatafile)) {
+          log.error("Adding file that is already in set " + newDatafile);
+        }
+        
+        if (dfv.getNumEntries() > 0) {
+          datafileSizes.put(newDatafile, dfv);
+        }
+        
+        // could be used by a follow on compaction in a multipass compaction
+        majorCompactingFiles.add(newDatafile);
+        
+        computeNumEntries();
+        
+        lastLocation = Tablet.this.lastLocation;
+        Tablet.this.lastLocation = null;
+        
+        if (compactionId != null)
+          lastCompactID = compactionId;
+        
+        t2 = System.currentTimeMillis();
+      }
+      
+      if (!extent.isRootTablet()) {
+        Set<FileRef> filesInUseByScans = waitForScansToFinish(oldDatafiles, false, 10000);
+        if (filesInUseByScans.size() > 0)
+          log.debug("Adding scan refs to metadata " + extent + " " + filesInUseByScans);
+        MasterMetadataUtil.replaceDatafiles(extent, oldDatafiles, filesInUseByScans, newDatafile, compactionId, dfv, SystemCredentials.get(),
+            tabletServer.getClientAddressString(), lastLocation, tabletServer.getLock());
+        removeFilesAfterScan(filesInUseByScans);
+      }
+      
+      log.debug(String.format("MajC finish lock %.2f secs", (t2 - t1) / 1000.0));
+      log.log(TLevel.TABLET_HIST, extent + " MajC " + oldDatafiles + " --> " + newDatafile);
+    }
+    
+    public SortedMap<FileRef,DataFileValue> getDatafileSizes() {
+      synchronized (Tablet.this) {
+        TreeMap<FileRef,DataFileValue> copy = new TreeMap<FileRef,DataFileValue>(datafileSizes);
+        return Collections.unmodifiableSortedMap(copy);
+      }
+    }
+    
+    public Set<FileRef> getFiles() {
+      synchronized (Tablet.this) {
+        HashSet<FileRef> files = new HashSet<FileRef>(datafileSizes.keySet());
+        return Collections.unmodifiableSet(files);
+      }
+    }
+    
+  }
+  
+  public Tablet(TabletServer tabletServer, Text location, KeyExtent extent, TabletResourceManager trm, SortedMap<Key,Value> tabletsKeyValues)
+      throws IOException {
+    this(tabletServer, location, extent, trm, CachedConfiguration.getInstance(), tabletsKeyValues);
+    splitCreationTime = 0;
+  }
+  
+  public Tablet(TabletServer tabletServer, Text location, KeyExtent extent, TabletResourceManager trm, SortedMap<FileRef,DataFileValue> datafiles, String time,
+      long initFlushID, long initCompactID) throws IOException {
+    this(tabletServer, location, extent, trm, CachedConfiguration.getInstance(), datafiles, time, initFlushID, initCompactID);
+    splitCreationTime = System.currentTimeMillis();
+  }
+  
+  private Tablet(TabletServer tabletServer, Text location, KeyExtent extent, TabletResourceManager trm, Configuration conf,
+      SortedMap<Key,Value> tabletsKeyValues) throws IOException {
+    this(tabletServer, location, extent, trm, conf, VolumeManagerImpl.get(), tabletsKeyValues);
+  }
+  
+  static private final List<LogEntry> EMPTY = Collections.emptyList();
+  
+  private Tablet(TabletServer tabletServer, Text location, KeyExtent extent, TabletResourceManager trm, Configuration conf,
+      SortedMap<FileRef,DataFileValue> datafiles, String time, long initFlushID, long initCompactID) throws IOException {
+    this(tabletServer, location, extent, trm, conf, VolumeManagerImpl.get(), EMPTY, datafiles, time, null, new HashSet<FileRef>(), initFlushID, initCompactID);
+  }
+  
+  private static String lookupTime(AccumuloConfiguration conf, KeyExtent extent, SortedMap<Key,Value> tabletsKeyValues) {
+    SortedMap<Key,Value> entries;
+    
+    if (extent.isRootTablet()) {
+      return null;
+    } else {
+      entries = new TreeMap<Key,Value>();
+      Text rowName = extent.getMetadataEntry();
+      for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
+        if (entry.getKey().compareRow(rowName) == 0 && TabletsSection.ServerColumnFamily.TIME_COLUMN.hasColumns(entry.getKey())) {
+          entries.put(new Key(entry.getKey()), new Value(entry.getValue()));
+        }
+      }
+    }
+    
+    // log.debug("extent : "+extent+"   entries : "+entries);
+    
+    if (entries.size() == 1)
+      return entries.values().iterator().next().toString();
+    return null;
+  }
+  
+  private static SortedMap<FileRef,DataFileValue> lookupDatafiles(AccumuloConfiguration conf, VolumeManager fs, KeyExtent extent,
+      SortedMap<Key,Value> tabletsKeyValues) throws IOException {
+    
+    TreeMap<FileRef,DataFileValue> datafiles = new TreeMap<FileRef,DataFileValue>();
+    
+    if (extent.isRootTablet()) { // the meta0 tablet
+      Path location = new Path(ServerConstants.getRootTabletDir());
+      location = location.makeQualified(fs.getDefaultVolume());
+      // cleanUpFiles() has special handling for delete. files
+      FileStatus[] files = fs.listStatus(location);
+      Collection<String> goodPaths = cleanUpFiles(fs, files, true);
+      for (String good : goodPaths) {
+        Path path = new Path(good);
+        String filename = path.getName();
+        FileRef ref = new FileRef(location.toString() + "/" + filename, path);
+        DataFileValue dfv = new DataFileValue(0, 0);
+        datafiles.put(ref, dfv);
+      }
+    } else {
+      
+      Text rowName = extent.getMetadataEntry();
+      
+      String tableId = extent.isMeta() ? RootTable.ID : MetadataTable.ID;
+      ScannerImpl mdScanner = new ScannerImpl(HdfsZooInstance.getInstance(), SystemCredentials.get(), tableId, Authorizations.EMPTY);
+      
+      // Commented out because when no data file is present, each tablet will scan through metadata table and return nothing
+      // reduced batch size to improve performance
+      // changed here after endKeys were implemented from 10 to 1000
+      mdScanner.setBatchSize(1000);
+      
+      // leave these in, again, now using endKey for safety
+      mdScanner.fetchColumnFamily(DataFileColumnFamily.NAME);
+      
+      mdScanner.setRange(new Range(rowName));
+      
+      for (Entry<Key,Value> entry : mdScanner) {
+        
+        if (entry.getKey().compareRow(rowName) != 0) {
+          break;
+        }
+        
+        FileRef ref = new FileRef(entry.getKey().getColumnQualifier().toString(), fs.getFullPath(entry.getKey()));
+        datafiles.put(ref, new DataFileValue(entry.getValue().get()));
+      }
+    }
+    return datafiles;
+  }
+  
+  private static List<LogEntry> lookupLogEntries(KeyExtent ke, SortedMap<Key,Value> tabletsKeyValues) {
+    List<LogEntry> logEntries = new ArrayList<LogEntry>();
+    
+    if (ke.isMeta()) {
+      try {
+        logEntries = MetadataTableUtil.getLogEntries(SystemCredentials.get(), ke);
+      } catch (Exception ex) {
+        throw new RuntimeException("Unable to read tablet log entries", ex);
+      }
+    } else {
+      log.debug("Looking at metadata " + tabletsKeyValues);
+      Text row = ke.getMetadataEntry();
+      for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
+        Key key = entry.getKey();
+        if (key.getRow().equals(row)) {
+          if (key.getColumnFamily().equals(LogColumnFamily.NAME)) {
+            logEntries.add(MetadataTableUtil.entryFromKeyValue(key, entry.getValue()));
+          }
+        }
+      }
+    }
+    
+    log.debug("got " + logEntries + " for logs for " + ke);
+    return logEntries;
+  }
+  
+  private static Set<FileRef> lookupScanFiles(KeyExtent extent, SortedMap<Key,Value> tabletsKeyValues, VolumeManager fs) throws IOException {
+    HashSet<FileRef> scanFiles = new HashSet<FileRef>();
+    
+    Text row = extent.getMetadataEntry();
+    for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
+      Key key = entry.getKey();
+      if (key.getRow().equals(row) && key.getColumnFamily().equals(ScanFileColumnFamily.NAME)) {
+        String meta = key.getColumnQualifier().toString();
+        Path path = fs.getFullPath(ServerConstants.getTablesDirs(), meta);
+        scanFiles.add(new FileRef(meta, path));
+      }
+    }
+    
+    return scanFiles;
+  }
+  
+  private static long lookupFlushID(KeyExtent extent, SortedMap<Key,Value> tabletsKeyValues) {
+    Text row = extent.getMetadataEntry();
+    for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
+      Key key = entry.getKey();
+      if (key.getRow().equals(row) && TabletsSection.ServerColumnFamily.FLUSH_COLUMN.equals(key.getColumnFamily(), key.getColumnQualifier()))
+        return Long.parseLong(entry.getValue().toString());
+    }
+    
+    return -1;
+  }
+  
+  private static long lookupCompactID(KeyExtent extent, SortedMap<Key,Value> tabletsKeyValues) {
+    Text row = extent.getMetadataEntry();
+    for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
+      Key key = entry.getKey();
+      if (key.getRow().equals(row) && TabletsSection.ServerColumnFamily.COMPACT_COLUMN.equals(key.getColumnFamily(), key.getColumnQualifier()))
+        return Long.parseLong(entry.getValue().toString());
+    }
+    
+    return -1;
+  }
+  
+  private Tablet(TabletServer tabletServer, Text location, KeyExtent extent, TabletResourceManager trm, Configuration conf, VolumeManager fs,
+      SortedMap<Key,Value> tabletsKeyValues) throws IOException {
+    this(tabletServer, location, extent, trm, conf, fs, lookupLogEntries(extent, tabletsKeyValues), lookupDatafiles(tabletServer.getSystemConfiguration(), fs,
+        extent, tabletsKeyValues), lookupTime(tabletServer.getSystemConfiguration(), extent, tabletsKeyValues), lookupLastServer(extent, tabletsKeyValues),
+        lookupScanFiles(extent, tabletsKeyValues, fs), lookupFlushID(extent, tabletsKeyValues), lookupCompactID(extent, tabletsKeyValues));
+  }
+  
+  private static TServerInstance lookupLastServer(KeyExtent extent, SortedMap<Key,Value> tabletsKeyValues) {
+    for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
+      if (entry.getKey().getColumnFamily().compareTo(TabletsSection.LastLocationColumnFamily.NAME) == 0) {
+        return new TServerInstance(entry.getValue(), entry.getKey().getColumnQualifier());
+      }
+    }
+    return null;
+  }
+  
+  /**
+   * yet another constructor - this one allows us to avoid costly lookups into the Metadata table if we already know the files we need - as at split time
+   */
+  private Tablet(final TabletServer tabletServer, final Text location, final KeyExtent extent, final TabletResourceManager trm, final Configuration conf,
+      final VolumeManager fs, final List<LogEntry> logEntries, final SortedMap<FileRef,DataFileValue> datafiles, String time,
+      final TServerInstance lastLocation, Set<FileRef> scanFiles, long initFlushID, long initCompactID) throws IOException {
+    if (location.find(":") >= 0) {
+      this.location = new Path(location.toString());
+    } else {
+      this.location = new Path(ServerConstants.getTablesDirs()[0] + "/" + extent.getTableId().toString() + location.toString());
+    }
+    this.location = this.location.makeQualified(fs.getFileSystemByPath(this.location));
+    this.lastLocation = lastLocation;
+    this.tabletDirectory = location.toString();
+    this.conf = conf;
+    this.acuTableConf = tabletServer.getTableConfiguration(extent);
+    
+    this.fs = fs;
+    this.extent = extent;
+    this.tabletResources = trm;
+    
+    this.lastFlushID = initFlushID;
+    this.lastCompactID = initCompactID;
+    
+    if (extent.isRootTablet()) {
+      
+      long rtime = Long.MIN_VALUE;
+      for (FileRef ref : datafiles.keySet()) {
+        Path path = ref.path();
+        FileSystem ns = fs.getFileSystemByPath(path);
+        FileSKVIterator reader = FileOperations.getInstance().openReader(path.toString(), true, ns, ns.getConf(), tabletServer.getTableConfiguration(extent));
+        long maxTime = -1;
+        try {
+          
+          while (reader.hasTop()) {
+            maxTime = Math.max(maxTime, reader.getTopKey().getTimestamp());
+            reader.next();
+          }
+          
+        } finally {
+          reader.close();
+        }
+        
+        if (maxTime > rtime) {
+          time = TabletTime.LOGICAL_TIME_ID + "" + maxTime;
+          rtime = maxTime;
+        }
+      }
+    }
+    
+    this.tabletServer = tabletServer;
+    this.logId = tabletServer.createLogId(extent);
+    
+    this.timer = new TabletStatsKeeper();
+    
+    setupDefaultSecurityLabels(extent);
+    
+    tabletMemory = new TabletMemory();
+    tabletTime = TabletTime.getInstance(time);
+    persistedTime = tabletTime.getTime();
+    
+    acuTableConf.addObserver(configObserver = new ConfigurationObserver() {
+      
+      private void reloadConstraints() {
+        constraintChecker.set(new ConstraintChecker(getTableConfiguration()));
+      }
+      
+      @Override
+      public void propertiesChanged() {
+        reloadConstraints();
+        
+        try {
+          setupDefaultSecurityLabels(extent);
+        } catch (Exception e) {
+          log.error("Failed to reload default security labels for extent: " + extent.toString());
+        }
+      }
+      
+      @Override
+      public void propertyChanged(String prop) {
+        if (prop.startsWith(Property.TABLE_CONSTRAINT_PREFIX.getKey()))
+          reloadConstraints();
+        else if (prop.equals(Property.TABLE_DEFAULT_SCANTIME_VISIBILITY.getKey())) {
+          try {
+            log.info("Default security labels changed for extent: " + extent.toString());
+            setupDefaultSecurityLabels(extent);
+          } catch (Exception e) {
+            log.error("Failed to reload default security labels for extent: " + extent.toString());
+          }
+        }
+        
+      }
+      
+      @Override
+      public void sessionExpired() {
+        log.debug("Session expired, no longer updating per table props...");
+      }
+      
+    });
+    // Force a load of any per-table properties
+    configObserver.propertiesChanged();
+    
+    tabletResources.setTablet(this, acuTableConf);
+    
+    if (!logEntries.isEmpty()) {
+      log.info("Starting Write-Ahead Log recovery for " + this.extent);
+      final long[] count = new long[2];
+      final CommitSession commitSession = tabletMemory.getCommitSession();
+      count[1] = Long.MIN_VALUE;
+      try {
+        Set<String> absPaths = new HashSet<String>();
+        for (FileRef ref : datafiles.keySet())
+          absPaths.add(ref.path().toString());
+        
+        tabletServer.recover(this.tabletServer.getFileSystem(), this, logEntries, absPaths, new MutationReceiver() {
+          @Override
+          public void receive(Mutation m) {
+            // LogReader.printMutation(m);
+            Collection<ColumnUpdate> muts = m.getUpdates();
+            for (ColumnUpdate columnUpdate : muts) {
+              if (!columnUpdate.hasTimestamp()) {
+                // if it is not a user set timestamp, it must have been set
+                // by the system
+                count[1] = Math.max(count[1], columnUpdate.getTimestamp());
+              }
+            }
+            tabletMemory.mutate(commitSession, Collections.singletonList(m));
+            count[0]++;
+          }
+        });
+        
+        if (count[1] != Long.MIN_VALUE) {
+          tabletTime.useMaxTimeFromWALog(count[1]);
+        }
+        commitSession.updateMaxCommittedTime(tabletTime.getTime());
+        
+        tabletMemory.updateMemoryUsageStats();
+        
+        if (count[0] == 0) {
+          MetadataTableUtil.removeUnusedWALEntries(extent, logEntries, tabletServer.getLock());
+          logEntries.clear();
+        }
+        
+      } catch (Throwable t) {
+        if (acuTableConf.getBoolean(Property.TABLE_FAILURES_IGNORE)) {
+          log.warn("Error recovering from log files: ", t);
+        } else {
+          throw new RuntimeException(t);
+        }
+      }
+      // make some closed references that represent the recovered logs
+      currentLogs = new HashSet<DfsLogger>();
+      for (LogEntry logEntry : logEntries) {
+        for (String log : logEntry.logSet) {
+          String[] parts = log.split("/", 2);
+          Path file = fs.getFullPath(ServerConstants.getWalDirs(), parts[1]);
+          currentLogs.add(new DfsLogger(tabletServer.getServerConfig(), logEntry.server, file));
+        }
+      }
+      
+      log.info("Write-Ahead Log recovery complete for " + this.extent + " (" + count[0] + " mutations applied, " + tabletMemory.getNumEntries()
+          + " entries created)");
+    }
+    
+    String contextName = acuTableConf.get(Property.TABLE_CLASSPATH);
+    if (contextName != null && !contextName.equals("")) {
+      // initialize context classloader, instead of possibly waiting for it to initialize for a scan
+      // TODO this could hang, causing other tablets to fail to load - ACCUMULO-1292
+      AccumuloVFSClassLoader.getContextManager().getClassLoader(contextName);
+    }
+    
+    // do this last after tablet is completely setup because it
+    // could cause major compaction to start
+    datafileManager = new DatafileManager(datafiles);
+    
+    computeNumEntries();
+    
+    datafileManager.removeFilesAfterScan(scanFiles);
+    
+    log.log(TLevel.TABLET_HIST, extent + " opened ");
+  }
+  
+  private void setupDefaultSecurityLabels(KeyExtent extent) {
+    if (extent.isMeta()) {
+      defaultSecurityLabel = new byte[0];
+    } else {
+      try {
+        ColumnVisibility cv = new ColumnVisibility(acuTableConf.get(Property.TABLE_DEFAULT_SCANTIME_VISIBILITY));
+        this.defaultSecurityLabel = cv.getExpression();
+      } catch (Exception e) {
+        log.error(e, e);
+        this.defaultSecurityLabel = new byte[0];
+      }
+    }
+  }
+  
+  private static Collection<String> cleanUpFiles(VolumeManager fs, FileStatus[] files, boolean deleteTmp) throws IOException {
+    /*
+     * called in constructor and before major compactions
+     */
+    Collection<String> goodFiles = new ArrayList<String>(files.length);
+    
+    for (FileStatus file : files) {
+      
+      String path = file.getPath().toString();
+      String filename = file.getPath().getName();
+      
+      // check for incomplete major compaction, this should only occur
+      // for root tablet
+      if (filename.startsWith("delete+")) {
+        String expectedCompactedFile = path.substring(0, path.lastIndexOf("/delete+")) + "/" + filename.split("\\+")[1];
+        if (fs.exists(new Path(expectedCompactedFile))) {
+          // compaction finished, but did not finish deleting compacted files.. so delete it
+          if (!fs.deleteRecursively(file.getPath()))
+            log.warn("Delete of file: " + file.getPath().toString() + " return false");
+          continue;
+        }
+        // compaction did not finish, so put files back
+        
+        // reset path and filename for rest of loop
+        filename = filename.split("\\+", 3)[2];
+        path = path.substring(0, path.lastIndexOf("/delete+")) + "/" + filename;
+        
+        if (!fs.rename(file.getPath(), new Path(path)))
+          log.warn("Rename of " + file.getPath().toString() + " to " + path + " returned false");
+      }
+      
+      if (filename.endsWith("_tmp")) {
+        if (deleteTmp) {
+          log.warn("cleaning up old tmp file: " + path);
+          if (!fs.deleteRecursively(file.getPath()))
+            log.warn("Delete of tmp file: " + file.getPath().toString() + " return false");
+          
+        }
+        continue;
+      }
+      
+      if (!filename.startsWith(Constants.MAPFILE_EXTENSION + "_") && !FileOperations.getValidExtensions().contains(filename.split("\\.")[1])) {
+        log.error("unknown file in tablet" + path);
+        continue;
+      }
+      
+      goodFiles.add(path);
+    }
+    
+    return goodFiles;
+  }
+  
+  public static class KVEntry extends KeyValue {
+    public KVEntry(Key k, Value v) {
+      super(new Key(k), Arrays.copyOf(v.get(), v.get().length));
+    }
+    
+    @Override
+    public String toString() {
+      return key.toString() + "=" + getValue();
+    }
+    
+    int numBytes() {
+      return key.getSize() + getValue().get().length;
+    }
+    
+    int estimateMemoryUsed() {
+      return key.getSize() + getValue().get().length + (9 * 32); // overhead is 32 per object
+    }
+  }
+  
+  private LookupResult lookup(SortedKeyValueIterator<Key,Value> mmfi, List<Range> ranges, HashSet<Column> columnSet, ArrayList<KVEntry> results,
+      long maxResultsSize) throws IOException {
+    
+    LookupResult lookupResult = new LookupResult();
+    
+    boolean exceededMemoryUsage = false;
+    boolean tabletClosed = false;
+    
+    Set<ByteSequence> cfset = null;
+    if (columnSet.size() > 0)
+      cfset = LocalityGroupUtil.families(columnSet);
+    
+    for (Range range : ranges) {
+      
+      if (exceededMemoryUsage || tabletClosed) {
+        lookupResult.unfinishedRanges.add(range);
+        continue;
+      }
+      
+      int entriesAdded = 0;
+      
+      try {
+        if (cfset != null)
+          mmfi.seek(range, cfset, true);
+        else
+          mmfi.seek(range, LocalityGroupUtil.EMPTY_CF_SET, false);
+        
+        while (mmfi.hasTop()) {
+          Key key = mmfi.getTopKey();
+          
+          KVEntry kve = new KVEntry(key, mmfi.getTopValue());
+          results.add(kve);
+          entriesAdded++;
+          lookupResult.bytesAdded += kve.estimateMemoryUsed();
+          lookupResult.dataSize += kve.numBytes();
+          
+          exceededMemoryUsage = lookupResult.bytesAdded > maxResultsSize;
+          
+          if (exceededMemoryUsage) {
+            addUnfinishedRange(lookupResult, range, key, false);
+            break;
+          }
+          
+          mmfi.next();
+        }
+        
+      } catch (TooManyFilesException tmfe) {
+        // treat this as a closed tablet, and let the client retry
+        log.warn("Tablet " + getExtent() + " has too many files, batch lookup can not run");
+        handleTabletClosedDuringScan(results, lookupResult, exceededMemoryUsage, range, entriesAdded);
+        tabletClosed = true;
+      } catch (IOException ioe) {
+        if (shutdownInProgress()) {
+          // assume HDFS shutdown hook caused this exception
+          log.debug("IOException while shutdown in progress ", ioe);
+          handleTabletClosedDuringScan(results, lookupResult, exceededMemoryUsage, range, entriesAdded);
+          tabletClosed = true;
+        } else {
+          throw ioe;
+        }
+      } catch (IterationInterruptedException iie) {
+        if (isClosed()) {
+          handleTabletClosedDuringScan(results, lookupResult, exceededMemoryUsage, range, entriesAdded);
+          tabletClosed = true;
+        } else {
+          throw iie;
+        }
+      } catch (TabletClosedException tce) {
+        handleTabletClosedDuringScan(results, lookupResult, exceededMemoryUsage, range, entriesAdded);
+        tabletClosed = true;
+      }
+      
+    }
+    
+    return lookupResult;
+  }
+  
+  private void handleTabletClosedDuringScan(ArrayList<KVEntry> results, LookupResult lookupResult, boolean exceededMemoryUsage, Range range, int entriesAdded) {
+    if (exceededMemoryUsage)
+      throw new IllegalStateException("tablet should not exceed memory usage or close, not both");
+    
+    if (entriesAdded > 0)
+      addUnfinishedRange(lookupResult, range, results.get(results.size() - 1).key, false);
+    else
+      lookupResult.unfinishedRanges.add(range);
+    
+    lookupResult.closed = true;
+  }
+  
+  private void addUnfinishedRange(LookupResult lookupResult, Range range, Key key, boolean inclusiveStartKey) {
+    if (range.getEndKey() == null || key.compareTo(range.getEndKey()) < 0) {
+      Range nlur = new Range(new Key(key), inclusiveStartKey, range.getEndKey(), range.isEndKeyInclusive());
+      lookupResult.unfinishedRanges.add(nlur);
+    }
+  }
+  
+  public static interface KVReceiver {
+    void receive(List<KVEntry> matches) throws IOException;
+  }
+  
+  class LookupResult {
+    List<Range> unfinishedRanges = new ArrayList<Range>();
+    long bytesAdded = 0;
+    long dataSize = 0;
+    boolean closed = false;
+  }
+  
+  public LookupResult lookup(List<Range> ranges, HashSet<Column> columns, Authorizations authorizations, ArrayList<KVEntry> results, long maxResultSize,
+      List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, AtomicBoolean interruptFlag) throws IOException {
+    
+    if (ranges.size() == 0) {
+      return new LookupResult();
+    }
+    
+    ranges = Range.mergeOverlapping(ranges);
+    Collections.sort(ranges);
+    
+    Range tabletRange = extent.toDataRange();
+    for (Range range : ranges) {
+      // do a test to see if this range falls within the tablet, if it does not
+      // then clip will throw an exception
+      tabletRange.clip(range);
+    }
+    
+    ScanDataSource dataSource = new ScanDataSource(authorizations, this.defaultSecurityLabel, columns, ssiList, ssio, interruptFlag);
+    
+    LookupResult result = null;
+    
+    try {
+      SortedKeyValueIterator<Key,Value> iter = new SourceSwitchingIterator(dataSource);
+      result = lookup(iter, ranges, columns, results, maxResultSize);
+      return result;
+    } catch (IOException ioe) {
+      dataSource.close(true);
+      throw ioe;
+    } finally {
+      // code in finally block because always want
+      // to return mapfiles, even when exception is thrown
+      dataSource.close(false);
+      
+      synchronized (this) {
+        queryCount += results.size();
+        if (result != null)
+          queryBytes += result.dataSize;
+      }
+    }
+  }
+  
+  private Batch nextBatch(SortedKeyValueIterator<Key,Value> iter, Range range, int num, Set<Column> columns) throws IOException {
+    
+    // log.info("In nextBatch..");
+    
+    List<KVEntry> results = new ArrayList<KVEntry>();
+    Key key = null;
+    
+    Value value;
+    long resultSize = 0L;
+    long resultBytes = 0L;
+    
+    long maxResultsSize = acuTableConf.getMemoryInBytes(Property.TABLE_SCAN_MAXMEM);
+    
+    if (columns.size() == 0) {
+      iter.seek(range, LocalityGroupUtil.EMPTY_CF_SET, false);
+    } else {
+      iter.seek(range, LocalityGroupUtil.families(columns), true);
+    }
+    
+    Key continueKey = null;
+    boolean skipContinueKey = false;
+    
+    boolean endOfTabletReached = false;
+    while (iter.hasTop()) {
+      
+      value = iter.getTopValue();
+      key = iter.getTopKey();
+      
+      KVEntry kvEntry = new KVEntry(key, value); // copies key and value
+      results.add(kvEntry);
+      resultSize += kvEntry.estimateMemoryUsed();
+      resultBytes += kvEntry.numBytes();
+      
+      if (resultSize >= maxResultsSize || results.size() >= num) {
+        continueKey = new Key(key);
+        skipContinueKey = true;
+        break;
+      }
+      
+      iter.next();
+    }
+    
+    if (iter.hasTop() == false) {
+      endOfTabletReached = true;
+    }
+    
+    Batch retBatch = new Batch();
+    retBatch.numBytes = resultBytes;
+    
+    if (!endOfTabletReached) {
+      retBatch.continueKey = continueKey;
+      retBatch.skipContinueKey = skipContinueKey;
+    } else {
+      retBatch.continueKey = null;
+    }
+    
+    if (endOfTabletReached && results.size() == 0)
+      retBatch.results = null;
+    else
+      retBatch.results = results;
+    
+    return retBatch;
+  }
+  
+  /**
+   * Determine if a JVM shutdown is in progress.
+   * 
+   */
+  private boolean shutdownInProgress() {
+    try {
+      Runtime.getRuntime().removeShutdownHook(new Thread(new Runnable() {
+        @Override
+        public void run() {}
+      }));
+    } catch (IllegalStateException ise) {
+      return true;
+    }
+    
+    return false;
+  }
+  
+  private class Batch {
+    public boolean skipContinueKey;
+    public List<KVEntry> results;
+    public Key continueKey;
+    public long numBytes;
+  }
+  
+  Scanner createScanner(Range range, int num, Set<Column> columns, Authorizations authorizations, List<IterInfo> ssiList, Map<String,Map<String,String>> ssio,
+      boolean isolated, AtomicBoolean interruptFlag) {
+    // do a test to see if this range falls within the tablet, if it does not
+    // then clip will throw an exception
+    extent.toDataRange().clip(range);
+    
+    ScanOptions opts = new ScanOptions(num, authorizations, this.defaultSecurityLabel, columns, ssiList, ssio, interruptFlag, isolated);
+    return new Scanner(range, opts);
+  }
+  
+  class ScanBatch {
+    boolean more;
+    List<KVEntry> results;
+    
+    ScanBatch(List<KVEntry> results, boolean more) {
+      this.results = results;
+      this.more = more;
+    }
+  }
+  
+  class Scanner {
+    
+    private ScanOptions options;
+    private Range range;
+    private SortedKeyValueIterator<Key,Value> isolatedIter;
+    private ScanDataSource isolatedDataSource;
+    private boolean sawException = false;
+    private boolean scanClosed = false;
+    
+    Scanner(Range range, ScanOptions options) {
+      this.range = range;
+      this.options = options;
+    }
+    
+    synchronized ScanBatch read() throws IOException, TabletClosedException {
+      
+      if (sawException)
+        throw new IllegalStateException("Tried to use scanner after exception occurred.");
+      
+      if (scanClosed)
+        throw new IllegalStateException("Tried to use scanner after it was closed.");
+      
+      Batch results = null;
+      
+      ScanDataSource dataSource;
+      
+      if (options.isolated) {
+        if (isolatedDataSource == null)
+          isolatedDataSource = new ScanDataSource(options);
+        dataSource = isolatedDataSource;
+      } else {
+        dataSource = new ScanDataSource(options);
+      }
+      
+      try {
+        
+        SortedKeyValueIterator<Key,Value> iter;
+        
+        if (options.isolated) {
+          if (isolatedIter == null)
+            isolatedIter = new SourceSwitchingIterator(dataSource, true);
+          else
+            isolatedDataSource.fileManager.reattach();
+          iter = isolatedIter;
+        } else {
+          iter = new SourceSwitchingIterator(dataSource, false);
+        }
+        
+        results = nextBatch(iter, range, options.num, options.columnSet);
+        
+        if (results.results == null) {
+          range = null;
+          return new ScanBatch(new ArrayList<Tablet.KVEntry>(), false);
+        } else if (results.continueKey == null) {
+          return new ScanBatch(results.results, false);
+        } else {
+          range = new Range(results.continueKey, !results.skipContinueKey, range.getEndKey(), range.isEndKeyInclusive());
+          return new ScanBatch(results.results, true);
+        }
+        
+      } catch (IterationInterruptedException iie) {
+        sawException = true;
+        if (isClosed())
+          throw new TabletClosedException(iie);
+        else
+          throw iie;
+      } catch (IOException ioe) {
+        if (shutdownInProgress()) {
+          log.debug("IOException while shutdown in progress ", ioe);
+          throw new TabletClosedException(ioe); // assume IOException was caused by execution of HDFS shutdown hook
+        }
+        
+        sawException = true;
+        dataSource.close(true);
+        throw ioe;
+      } catch (RuntimeException re) {
+        sawException = true;
+        throw re;
+      } finally {
+        // code in finally block because always want
+        // to return mapfiles, even when exception is thrown
+        if (!options.isolated)
+          dataSource.close(false);
+        else if (dataSource.fileManager != null)
+          dataSource.fileManager.detach();
+        
+        synchronized (Tablet.this) {
+          if (results != null && results.results != null) {
+            long more = results.results.size();
+            queryCount += more;
+            queryBytes += results.numBytes;
+          }
+        }
+      }
+    }
+    
+    // close and read are synchronized because can not call close on the data source while it is in use
+    // this cloud lead to the case where file iterators that are in use by a thread are returned
+    // to the pool... this would be bad
+    void close() {
+      options.interruptFlag.set(true);
+      synchronized (this) {
+        scanClosed = true;
+        if (isolatedDataSource != null)
+          isolatedDataSource.close(false);
+      }
+    }
+  }
+  
+  static class ScanOptions {
+    
+    // scan options
+    Authorizations authorizations;
+    byte[] defaultLabels;
+    Set<Column> columnSet;
+    List<IterInfo> ssiList;
+    Map<String,Map<String,String>> ssio;
+    AtomicBoolean interruptFlag;
+    int num;
+    boolean isolated;
+    
+    ScanOptions(int num, Authorizations authorizations, byte[] defaultLabels, Set<Column> columnSet, List<IterInfo> ssiList,
+        Map<String,Map<String,String>> ssio, AtomicBoolean interruptFlag, boolean isolated) {
+      this.num = num;
+      this.authorizations = authorizations;
+      this.defaultLabels = defaultLabels;
+      this.columnSet = columnSet;
+      this.ssiList = ssiList;
+      this.ssio = ssio;
+      this.interruptFlag = interruptFlag;
+      this.isolated = isolated;
+    }
+    
+  }
+  
+  class ScanDataSource implements DataSource {
+    
+    // data source state
+    private ScanFileManager fileManager;
+    private SortedKeyValueIterator<Key,Value> iter;
+    private long expectedDeletionCount;
+    private List<MemoryIterator> memIters = null;
+    private long fileReservationId;
+    private AtomicBoolean interruptFlag;
+    private StatsIterator statsIterator;
+    
+    ScanOptions options;
+    
+    ScanDataSource(Authorizations authorizations, byte[] defaultLabels, HashSet<Column> columnSet, List<IterInfo> ssiList, Map<String,Map<String,String>> ssio,
+        AtomicBoolean interruptFlag) {
+      expectedDeletionCount = dataSourceDeletions.get();
+      this.options = new ScanOptions(-1, authorizations, defaultLabels, columnSet, ssiList, ssio, interruptFlag, false);
+      this.interruptFlag = interruptFlag;
+    }
+    
+    ScanDataSource(ScanOptions options) {
+      expectedDeletionCount = dataSourceDeletions.get();
+      this.options = options;
+      this.interruptFlag = options.interruptFlag;
+    }
+    
+    @Override
+    public DataSource getNewDataSource() {
+      if (!isCurrent()) {
+        // log.debug("Switching data sources during a scan");
+        if (memIters != null) {
+          tabletMemory.returnIterators(memIters);
+          memIters = null;
+          datafileManager.returnFilesForScan(fileReservationId);
+          fileReservationId = -1;
+        }
+        
+        if (fileManager != null)
+          fileManager.releaseOpenFiles(false);
+        
+        expectedDeletionCount = dataSourceDeletions.get();
+        iter = null;
+        
+        return this;
+      } else
+        return this;
+    }
+    
+    @Override
+    public boolean isCurrent() {
+      return expectedDeletionCount == dataSourceDeletions.get();
+    }
+    
+    @Override
+    public SortedKeyValueIterator<Key,Value> iterator() throws IOException {
+      if (iter == null)
+        iter = createIterator();
+      return iter;
+    }
+    
+    private SortedKeyValueIterator<Key,Value> createIterator() throws IOException {
+      
+      Map<FileRef,DataFileValue> files;
+      
+      synchronized (Tablet.this) {
+        
+        if (memIters != null)
+          throw new IllegalStateException("Tried to create new scan iterator w/o releasing memory");
+        
+        if (Tablet.this.closed)
+          throw new TabletClosedException();
+        
+        if (interruptFlag.get())
+          throw new IterationInterruptedException(extent.toString() + " " + interruptFlag.hashCode());
+        
+        // only acquire the file manager when we know the tablet is open
+        if (fileManager == null) {
+          fileManager = tabletResources.newScanFileManager();
+          activeScans.add(this);
+        }
+        
+        if (fileManager.getNumOpenFiles() != 0)
+          throw new IllegalStateException("Tried to create new scan iterator w/o releasing files");
+        
+        // set this before trying to get iterators in case
+        // getIterators() throws an exception
+        expectedDeletionCount = dataSourceDeletions.get();
+        
+        memIters = tabletMemory.getIterators();
+        Pair<Long,Map<FileRef,DataFileValue>> reservation = datafileManager.reserveFilesForScan();
+        fileReservationId = reservation.getFirst();
+        files = reservation.getSecond();
+      }
+      
+      Collection<InterruptibleIterator> mapfiles = fileManager.openFiles(files, options.isolated);
+      
+      List<SortedKeyValueIterator<Key,Value>> iters = new ArrayList<SortedKeyValueIterator<Key,Value>>(mapfiles.size() + memIters.size());
+      
+      iters.addAll(mapfiles);
+      iters.addAll(memIters);
+      
+      for (SortedKeyValueIterator<Key,Value> skvi : iters)
+        ((InterruptibleIterator) skvi).setInterruptFlag(interruptFlag);
+      
+      MultiIterator multiIter = new MultiIterator(iters, extent);
+      
+      TabletIteratorEnvironment iterEnv = new TabletIteratorEnvironment(IteratorScope.scan, acuTableConf, fileManager, files);
+      
+      statsIterator = new StatsIterator(multiIter, TabletServer.seekCount, scannedCount);
+      
+      DeletingIterator delIter = new DeletingIterator(statsIterator, false);
+      
+      ColumnFamilySkippingIterator cfsi = new ColumnFamilySkippingIterator(delIter);
+      
+      ColumnQualifierFilter colFilter = new ColumnQualifierFilter(cfsi, options.columnSet);
+      
+      VisibilityFilter visFilter = new VisibilityFilter(colFilter, options.authorizations, options.defaultLabels);
+      
+      return iterEnv.getTopLevelIterator(IteratorUtil
+          .loadIterators(IteratorScope.scan, visFilter, extent, acuTableConf, options.ssiList, options.ssio, iterEnv));
+    }
+    
+    private void close(boolean sawErrors) {
+      
+      if (memIters != null) {
+        tabletMemory.returnIterators(memIters);
+        memIters = null;
+        datafileManager.returnFilesForScan(fileReservationId);
+        fileReservationId = -1;
+      }
+      
+      synchronized (Tablet.this) {
+        activeScans.remove(this);
+        if (activeScans.size() == 0)
+          Tablet.this.notifyAll();
+      }
+      
+      if (fileManager != null) {
+        fileManager.releaseOpenFiles(sawErrors);
+        fileManager = null;
+      }
+      
+      if (statsIterator != null) {
+        statsIterator.report();
+      }
+      
+    }
+    
+    public void interrupt() {
+      interruptFlag.set(true);
+    }
+    
+    @Override
+    public DataSource getDeepCopyDataSource(IteratorEnvironment env) {
+      throw new UnsupportedOperationException();
+    }
+    
+  }
+  
+  private DataFileValue minorCompact(Configuration conf, VolumeManager fs, InMemoryMap memTable, FileRef tmpDatafile, FileRef newDatafile, FileRef mergeFile,
+      boolean hasQueueTime, long queued, CommitSession commitSession, long flushId, MinorCompactionReason mincReason) {
+    boolean failed = false;
+    long start = System.currentTimeMillis();
+    timer.incrementStatusMinor();
+    
+    long count = 0;
+    
+    try {
+      Span span = Trace.start("write");
+      CompactionStats stats;
+      try {
+        count = memTable.getNumEntries();
+        
+        DataFileValue dfv = null;
+        if (mergeFile != null)
+          dfv = datafileManager.getDatafileSizes().get(mergeFile);
+        
+        MinorCompactor compactor = new MinorCompactor(conf, fs, memTable, mergeFile, dfv, tmpDatafile, acuTableConf, extent, mincReason);
+        stats = compactor.call();
+      } finally {
+        span.stop();
+      }
+      span = Trace.start("bringOnline");
+      try {
+        datafileManager.bringMinorCompactionOnline(tmpDatafile, newDatafile, mergeFile, new DataFileValue(stats.getFileSize(), stats.getEntriesWritten()),
+            commitSession, flushId);
+      } finally {
+        span.stop();
+      }
+      return new DataFileValue(stats.getFileSize(), stats.getEntriesWritten());
+    } catch (Exception E) {
+      failed = true;
+      throw new RuntimeException(E);
+    } catch (Error E) {
+      // Weird errors like "OutOfMemoryError" when trying to create the thread for the compaction
+      failed = true;
+      throw new RuntimeException(E);
+    } finally {
+      try {
+        tabletMemory.finalizeMinC();
+      } catch (Throwable t) {
+        log.error("Failed to free tablet memory", t);
+      }
+      
+      if (!failed) {
+        lastMinorCompactionFinishTime = System.currentTimeMillis();
+      }
+      if (tabletServer.mincMetrics.isEnabled())
+        tabletServer.mincMetrics.add(TabletServerMinCMetrics.minc, (lastMinorCompactionFinishTime - start));
+      if (hasQueueTime) {
+        timer.updateTime(Operation.MINOR, queued, start, count, failed);
+        if (tabletServer.mincMetrics.isEnabled())
+          tabletServer.mincMetrics.add(TabletServerMinCMetrics.queue, (start - queued));
+      } else
+        timer.updateTime(Operation.MINOR, start, count, failed);
+    }
+  }
+  
+  private class MinorCompactionTask implements Runnable {
+    
+    private long queued;
+    private CommitSession commitSession;
+    private DataFileValue stats;
+    private FileRef mergeFile;
+    private long flushId;
+    private MinorCompactionReason mincReason;
+    
+    MinorCompactionTask(FileRef mergeFile, CommitSession commitSession, long flushId, MinorCompactionReason mincReason) {
+      queued = System.currentTimeMillis();
+      minorCompactionWaitingToStart = true;
+      this.commitSession = commitSession;
+      this.mergeFile = mergeFile;
+      this.flushId = flushId;
+      this.mincReason = mincReason;
+    }
+    
+    @Override
+    public void run() {
+      minorCompactionWaitingToStart = false;
+      minorCompactionInProgress = true;
+      Span minorCompaction = Trace.on("minorCompaction");
+      try {
+        FileRef newMapfileLocation = getNextMapFilename(mergeFile == null ? "F" : "M");
+        FileRef tmpFileRef = new FileRef(newMapfileLocation.path() + "_tmp");
+        Span span = Trace.start("waitForCommits");
+        synchronized (Tablet.this) {
+          commitSession.waitForCommitsToFinish();
+        }
+        span.stop();
+        span = Trace.start("start");
+        while (true) {
+          try {
+            // the purpose of the minor compaction start event is to keep track of the filename... in the case
+            // where the metadata table write for the minor compaction finishes and the process dies before
+            // writing the minor compaction finish event, then the start event+filename in metadata table will
+            // prevent recovery of duplicate data... the minor compaction start event could be written at any time
+            // before the metadata write for the minor compaction
+            tabletServer.minorCompactionStarted(commitSession, commitSession.getWALogSeq() + 1, newMapfileLocation.path().toString());
+            break;
+          } catch (IOException e) {
+            log.warn("Failed to write to write ahead log " + e.getMessage(), e);
+          }
+        }
+        span.stop();
+        span = Trace.start("compact");
+        this.stats = minorCompact(conf, fs, tabletMemory.getMinCMemTable(), tmpFileRef, newMapfileLocation, mergeFile, true, queued, commitSession, flushId,
+            mincReason);
+        span.stop();
+        
+        if (needsSplit()) {
+          tabletServer.executeSplit(Tablet.this);
+        } else {
+          initiateMajorCompaction(MajorCompactionReason.NORMAL);
+        }
+      } catch (Throwable t) {
+        log.error("Unknown error during minor compaction for extent: " + getExtent(), t);
+        throw new RuntimeException(t);
+      } finally {
+        minorCompactionInProgress = false;
+        minorCompaction.data("extent", extent.toString());
+        minorCompaction.data("numEntries", Long.toString(this.stats.getNumEntries()));
+        minorCompaction.data("size", Long.toString(this.stats.getSize()));
+        minorCompaction.stop();
+      }
+    }
+  }
+  
+  private synchronized MinorCompactionTask prepareForMinC(long flushId, MinorCompactionReason mincReason) {
+    CommitSession oldCommitSession = tabletMemory.prepareForMinC();
+    otherLogs = currentLogs;
+    currentLogs = new HashSet<DfsLogger>();
+    
+    FileRef mergeFile = datafileManager.reserveMergingMinorCompactionFile();
+    
+    return new MinorCompactionTask(mergeFile, oldCommitSession, flushId, mincReason);
+    
+  }
+  
+  void flush(long tableFlushID) {
+    boolean updateMetadata = false;
+    boolean initiateMinor = false;
+    
+    try {
+      
+      synchronized (this) {
+        
+        // only want one thing at a time to update flush ID to ensure that metadata table and tablet in memory state are consistent
+        if (updatingFlushID)
+          return;
+        
+        if (lastFlushID >= tableFlushID)
+          return;
+        
+        if (closing || closed || tabletMemory.memoryReservedForMinC())
+          return;
+        
+        if (tabletMemory.getMemTable().getNumEntries() == 0) {
+          lastFlushID = tableFlushID;
+          updatingFlushID = true;
+          updateMetadata = true;
+        } else
+          initiateMinor = true;
+      }
+      
+      if (updateMetadata) {
+        Credentials creds = SystemCredentials.get();
+        // if multiple threads were allowed to update this outside of a sync block, then it would be
+        // a race condition
+        MetadataTableUtil.updateTabletFlushID(extent, tableFlushID, creds, tabletServer.getLock());
+      } else if (initiateMinor)
+        initiateMinorCompaction(tableFlushID, MinorCompactionReason.USER);
+      
+    } finally {
+      if (updateMetadata) {
+        synchronized (this) {
+          updatingFlushID = false;
+          this.notifyAll();
+        }
+      }
+    }
+    
+  }
+  
+  boolean initiateMinorCompaction(MinorCompactionReason mincReason) {
+    if (isClosed()) {
+      // don't bother trying to get flush id if closed... could be closed after this check but that is ok... just trying to cut down on uneeded log messages....
+      return false;
+    }
+    
+    // get the flush id before the new memmap is made available for write
+    long flushId;
+    try {
+      flushId = getFlushID();
+    } catch (NoNodeException e) {
+      log.info("Asked to initiate MinC when there was no flush id " + getExtent() + " " + e.getMessage());
+      return false;
+    }
+    return initiateMinorCompaction(flushId, mincReason);
+  }
+  
+  boolean minorCompactNow(MinorCompactionReason mincReason) {
+    long flushId;
+    try {
+      flushId = getFlushID();
+    } catch (NoNodeException e) {
+      log.info("Asked to initiate MinC when there was no flush id " + getExtent() + " " + e.getMessage());
+      return false;
+    }
+    MinorCompactionTask mct = createMinorCompactionTask(flushId, mincReason);
+    if (mct == null)
+      return false;
+    mct.run();
+    return true;
+  }
+  
+  boolean initiateMinorCompaction(long flushId, MinorCompactionReason mincReason) {
+    MinorCompactionTask mct = createMinorCompactionTask(flushId, mincReason);
+    if (mct == null)
+      return false;
+    tabletResources.executeMinorCompaction(mct);
+    return true;
+  }
+  
+  private MinorCompactionTask createMinorCompactionTask(long flushId, MinorCompactionReason mincReason) {
+    MinorCompactionTask mct;
+    long t1, t2;
+    
+    StringBuilder logMessage = null;
+    
+    try {
+      synchronized (this) {
+        t1 = System.currentTimeMillis();
+        
+        if (closing || closed || majorCompactionWaitingToStart || tabletMemory.memoryReservedForMinC() || tabletMemory.getMemTable().getNumEntries() == 0
+            || updatingFlushID) {
+          
+          logMessage = new StringBuilder();
+          
+          logMessage.append(extent.toString());
+          logMessage.append(" closing " + closing);
+          logMessage.append(" closed " + closed);
+          logMessage.append(" majorCompactionWaitingToStart " + majorCompactionWaitingToStart);
+          if (tabletMemory != null)
+            logMessage.append(" tabletMemory.memoryReservedForMinC() " + tabletMemory.memoryReservedForMinC());
+          if (tabletMemory != null && tabletMemory.getMemTable() != null)
+            logMessage.append(" tabletMemory.getMemTable().getNumEntries() " + tabletMemory.getMemTable().getNumEntries());
+          logMessage.append(" updatingFlushID " + updatingFlushID);
+          
+          return null;
+        }
+        // We're still recovering log entries
+        if (datafileManager == null) {
+          logMessage = new StringBuilder();
+          logMessage.append(extent.toString());
+          logMessage.append(" datafileManager " + datafileManager);
+          return null;
+        }
+        
+        mct = prepareForMinC(flushId, mincReason);
+        t2 = System.currentTimeMillis();
+      }
+    } finally {
+      // log outside of sync block
+      if (logMessage != null && log.isDebugEnabled())
+        log.debug(logMessage);
+    }
+    
+    log.debug(String.format("MinC initiate lock %.2f secs", (t2 - t1) / 1000.0));
+    return mct;
+  }
+  
+  long getFlushID() throws NoNodeException {
+    try {
+      String zTablePath = Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID() + Constants.ZTABLES + "/" + extent.getTableId()
+          + Constants.ZTABLE_FLUSH_ID;
+      return Long.parseLong(new String(ZooReaderWriter.getRetryingInstance().getData(zTablePath, null)));
+    } catch (InterruptedException e) {
+      throw new RuntimeEx

<TRUNCATED>

Mime
View raw message