accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject [1/6] accumulo git commit: ACCUMULO-3462 reduce the opportunity for the queued tablet to get mis-marked
Date Mon, 05 Jan 2015 20:10:32 GMT
Repository: accumulo
Updated Branches:
  refs/heads/master 9022987e0 -> b90b123e1


http://git-wip-us.apache.org/repos/asf/accumulo/blob/07422efc/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
index ddae38a..4a99667 100644
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
@@ -139,20 +139,20 @@ import org.apache.zookeeper.KeeperException.NoNodeException;
  * We need to be able to have the master tell a tabletServer to
  * close this file, and the tablet server to handle all pending client reads
  * before closing
- * 
+ *
  */
 
 /**
- * 
+ *
  * this class just provides an interface to read from a MapFile mostly takes care of reporting start and end keys
- * 
+ *
  * need this because a single row extent can have multiple columns this manages all the columns (each handled by a store) for a single row-extent
- * 
- * 
+ *
+ *
  */
 
 public class Tablet {
-  
+
   enum MajorCompactionReason {
     // do not change the order, the order of this enum determines the order
     // in which queued major compactions are executed
@@ -161,44 +161,44 @@ public class Tablet {
     NORMAL,
     IDLE
   }
-  
+
   enum MinorCompactionReason {
     USER, SYSTEM, CLOSE
   }
 
   public class CommitSession {
-    
+
     private int seq;
     private InMemoryMap memTable;
     private int commitsInProgress;
     private long maxCommittedTime = Long.MIN_VALUE;
-    
+
     private CommitSession(int seq, InMemoryMap imm) {
       this.seq = seq;
       this.memTable = imm;
       commitsInProgress = 0;
     }
-    
+
     public int getWALogSeq() {
       return seq;
     }
-    
+
     private void decrementCommitsInProgress() {
       if (commitsInProgress < 1)
         throw new IllegalStateException("commitsInProgress = " + commitsInProgress);
-      
+
       commitsInProgress--;
       if (commitsInProgress == 0)
         Tablet.this.notifyAll();
     }
-    
+
     private void incrementCommitsInProgress() {
       if (commitsInProgress < 0)
         throw new IllegalStateException("commitsInProgress = " + commitsInProgress);
-      
+
       commitsInProgress++;
     }
-    
+
     private void waitForCommitsToFinish() {
       while (commitsInProgress > 0) {
         try {
@@ -208,105 +208,105 @@ public class Tablet {
         }
       }
     }
-    
+
     public void abortCommit(List<Mutation> value) {
       Tablet.this.abortCommit(this, value);
     }
-    
+
     public void commit(List<Mutation> mutations) {
       Tablet.this.commit(this, mutations);
     }
-    
+
     public Tablet getTablet() {
       return Tablet.this;
     }
-    
+
     public boolean beginUpdatingLogsUsed(ArrayList<DfsLogger> copy, boolean mincFinish) {
       return Tablet.this.beginUpdatingLogsUsed(memTable, copy, mincFinish);
     }
-    
+
     public void finishUpdatingLogsUsed() {
       Tablet.this.finishUpdatingLogsUsed();
     }
-    
+
     public int getLogId() {
       return logId;
     }
-    
+
     public KeyExtent getExtent() {
       return extent;
     }
-    
+
     private void updateMaxCommittedTime(long time) {
       maxCommittedTime = Math.max(time, maxCommittedTime);
     }
-    
+
     private long getMaxCommittedTime() {
       if (maxCommittedTime == Long.MIN_VALUE)
         throw new IllegalStateException("Tried to read max committed time when it was never set");
       return maxCommittedTime;
     }
-    
+
   }
-  
+
   private class TabletMemory {
     private InMemoryMap memTable;
     private InMemoryMap otherMemTable;
     private InMemoryMap deletingMemTable;
     private int nextSeq = 1;
     private CommitSession commitSession;
-    
+
     TabletMemory() {
       memTable = new InMemoryMap(tabletServer.getSystemConfiguration());
       commitSession = new CommitSession(nextSeq, memTable);
       nextSeq += 2;
     }
-    
+
     InMemoryMap getMemTable() {
       return memTable;
     }
-    
+
     InMemoryMap getMinCMemTable() {
       return otherMemTable;
     }
-    
+
     CommitSession prepareForMinC() {
       if (otherMemTable != null) {
         throw new IllegalStateException();
       }
-      
+
       if (deletingMemTable != null) {
         throw new IllegalStateException();
       }
-      
+
       otherMemTable = memTable;
       memTable = new InMemoryMap(tabletServer.getSystemConfiguration());
-      
+
       CommitSession oldCommitSession = commitSession;
       commitSession = new CommitSession(nextSeq, memTable);
       nextSeq += 2;
-      
+
       tabletResources.updateMemoryUsageStats(memTable.estimatedSizeInBytes(), otherMemTable.estimatedSizeInBytes());
-      
+
       return oldCommitSession;
     }
-    
+
     void finishedMinC() {
-      
+
       if (otherMemTable == null) {
         throw new IllegalStateException();
       }
-      
+
       if (deletingMemTable != null) {
         throw new IllegalStateException();
       }
-      
+
       deletingMemTable = otherMemTable;
-      
+
       otherMemTable = null;
       Tablet.this.notifyAll();
     }
-    
+
     void finalizeMinC() {
       try {
         deletingMemTable.delete(15000);
@@ -315,22 +315,22 @@ public class Tablet {
           if (otherMemTable != null) {
             throw new IllegalStateException();
           }
-          
+
           if (deletingMemTable == null) {
             throw new IllegalStateException();
           }
-          
+
           deletingMemTable = null;
-          
+
           tabletResources.updateMemoryUsageStats(memTable.estimatedSizeInBytes(), 0);
         }
       }
     }
-    
+
     boolean memoryReservedForMinC() {
       return otherMemTable != null || deletingMemTable != null;
     }
-    
+
     void waitForMinC() {
       while (otherMemTable != null || deletingMemTable != null) {
         try {
@@ -340,21 +340,21 @@ public class Tablet {
         }
       }
     }
-    
+
     void mutate(CommitSession cm, List<Mutation> mutations) {
       cm.memTable.mutate(mutations);
     }
-    
+
     void updateMemoryUsageStats() {
       long other = 0;
       if (otherMemTable != null)
         other = otherMemTable.estimatedSizeInBytes();
       else if (deletingMemTable != null)
         other = deletingMemTable.estimatedSizeInBytes();
-      
+
       tabletResources.updateMemoryUsageStats(memTable.estimatedSizeInBytes(), other);
     }
-    
+
     List<MemoryIterator> getIterators() {
       List<MemoryIterator> toReturn = new ArrayList<MemoryIterator>(2);
       toReturn.add(memTable.skvIterator());
@@ -362,50 +362,50 @@ public class Tablet {
         toReturn.add(otherMemTable.skvIterator());
       return toReturn;
     }
-    
+
     void returnIterators(List<MemoryIterator> iters) {
       for (MemoryIterator iter : iters) {
         iter.close();
       }
     }
-    
+
     public long getNumEntries() {
       if (otherMemTable != null)
         return memTable.getNumEntries() + otherMemTable.getNumEntries();
       return memTable.getNumEntries();
     }
-    
+
     CommitSession getCommitSession() {
       return commitSession;
     }
   }
-  
+
   private TabletMemory tabletMemory;
-  
+
   private final TabletTime tabletTime;
   private long persistedTime;
   private final Object timeLock = new Object();
-  
+
   private final Path location; // absolute path of this tablets dir
   private TServerInstance lastLocation;
-  
+
   private Configuration conf;
   private FileSystem fs;
-  
+
   private TableConfiguration acuTableConf;
-  
+
   private volatile boolean tableDirChecked = false;
-  
+
   private AtomicLong dataSourceDeletions = new AtomicLong(0);
   private Set<ScanDataSource> activeScans = new HashSet<ScanDataSource>();
-  
+
   private volatile boolean closing = false;
   private boolean closed = false;
   private boolean closeComplete = false;
-  
+
   private long lastFlushID = -1;
   private long lastCompactID = -1;
-  
+
   private static class CompactionWaitInfo {
     long flushID = -1;
     long compactionID = -1;
@@ -415,7 +415,7 @@ public class Tablet {
   private CompactionWaitInfo compactionWaitInfo = new CompactionWaitInfo();
 
   private KeyExtent extent;
-  
+
   private TabletResourceManager tabletResources;
   final private DatafileManager datafileManager;
   private volatile boolean majorCompactionInProgress = false;
@@ -423,127 +423,127 @@ public class Tablet {
   private Set<MajorCompactionReason> majorCompactionQueued = Collections.synchronizedSet(EnumSet.noneOf(MajorCompactionReason.class));
   private volatile boolean minorCompactionInProgress = false;
   private volatile boolean minorCompactionWaitingToStart = false;
-  
+
   private boolean updatingFlushID = false;
-  
+
   private AtomicReference<ConstraintChecker> constraintChecker = new AtomicReference<ConstraintChecker>();
-  
+
   private final String tabletDirectory;
-  
+
   private int writesInProgress = 0;
-  
+
   private static final Logger log = Logger.getLogger(Tablet.class);
   public TabletStatsKeeper timer;
-  
+
   private Rate queryRate = new Rate(0.2);
   private long queryCount = 0;
-  
+
   private Rate queryByteRate = new Rate(0.2);
   private long queryBytes = 0;
-  
+
   private Rate ingestRate = new Rate(0.2);
   private long ingestCount = 0;
-  
+
   private Rate ingestByteRate = new Rate(0.2);
   private long ingestBytes = 0;
-  
+
   private byte[] defaultSecurityLabel = new byte[0];
-  
+
   private long lastMinorCompactionFinishTime;
   private long lastMapFileImportTime;
-  
+
   private volatile long numEntries;
   private volatile long numEntriesInMemory;
-  
+
 
   // a count of the amount of data read by the iterators
   private AtomicLong scannedCount = new AtomicLong(0);
   private Rate scannedRate = new Rate(0.2);
 
   private ConfigurationObserver configObserver;
-  
+
   private TabletServer tabletServer;
-  
+
   private final int logId;
   // ensure we only have one reader/writer of our bulk file notes at at time
   public final Object bulkFileImportLock = new Object();
-  
+
   public int getLogId() {
     return logId;
   }
-  
+
   public static class TabletClosedException extends RuntimeException {
     public TabletClosedException(Exception e) {
       super(e);
     }
-    
+
     public TabletClosedException() {
       super();
     }
-    
+
     private static final long serialVersionUID = 1L;
   }
-  
+
   String getNextMapFilename(String prefix) throws IOException {
     String extension = FileOperations.getNewFileExtension(tabletServer.getTableConfiguration(extent));
     checkTabletDir();
     return location.toString() + "/" + prefix + UniqueNameAllocator.getInstance().getNextName() + "." + extension;
   }
-  
+
   private void checkTabletDir() throws IOException {
     if (!tableDirChecked) {
       checkTabletDir(this.location);
       tableDirChecked = true;
     }
   }
-  
+
   private void checkTabletDir(Path tabletDir) throws IOException {
-    
+
     FileStatus[] files = null;
     try {
       files = fs.listStatus(tabletDir);
     } catch (FileNotFoundException ex) {
       // ignored
     }
-    
+
     if (files == null) {
       if (tabletDir.getName().startsWith("c-"))
         log.debug("Tablet " + extent + " had no dir, creating " + tabletDir); // its a clone dir...
       else
         log.warn("Tablet " + extent + " had no dir, creating " + tabletDir);
-      
+
       fs.mkdirs(tabletDir);
     }
   }
-  
+
   private static String rel2abs(String relPath, KeyExtent extent) {
     if (relPath.startsWith("../"))
       return ServerConstants.getTablesDir() + relPath.substring(2);
     else
       return ServerConstants.getTablesDir() + "/" + extent.getTableId() + relPath;
   }
-  
+
   class DatafileManager {
     // access to datafilesizes needs to be synchronized: see CompactionRunner#getNumFiles
     final private Map<Path,DataFileValue> datafileSizes = Collections.synchronizedMap(new TreeMap<Path,DataFileValue>());
-    
+
     DatafileManager(SortedMap<String,DataFileValue> datafileSizes) {
       for (Entry<String,DataFileValue> datafiles : datafileSizes.entrySet())
         this.datafileSizes.put(new Path(rel2abs(datafiles.getKey(), extent)), datafiles.getValue());
     }
-    
+
     Path mergingMinorCompactionFile = null;
     Set<Path> filesToDeleteAfterScan = new HashSet<Path>();
     Map<Long,Set<Path>> scanFileReservations = new HashMap<Long,Set<Path>>();
     MapCounter<Path> fileScanReferenceCounts = new MapCounter<Path>();
     long nextScanReservationId = 0;
     boolean reservationsBlocked = false;
-    
+
     Set<Path> majorCompactingFiles = new HashSet<Path>();
-    
+
     Pair<Long,Map<String,DataFileValue>> reserveFilesForScan() {
       synchronized (Tablet.this) {
-        
+
         while (reservationsBlocked) {
           try {
             Tablet.this.wait(50);
@@ -551,34 +551,34 @@ public class Tablet {
             log.warn(e, e);
           }
         }
-        
+
         Set<Path> absFilePaths = new HashSet<Path>(datafileSizes.keySet());
-        
+
         long rid = nextScanReservationId++;
-        
+
         scanFileReservations.put(rid, absFilePaths);
-        
+
         Map<String,DataFileValue> ret = new HashMap<String,MetadataTable.DataFileValue>();
-        
+
         for (Path path : absFilePaths) {
           fileScanReferenceCounts.increment(path, 1);
           ret.put(path.toString(), datafileSizes.get(path));
         }
-        
+
         return new Pair<Long,Map<String,DataFileValue>>(rid, ret);
       }
     }
-    
+
     void returnFilesForScan(Long reservationId) {
-      
+
       final Set<Path> filesToDelete = new HashSet<Path>();
-      
+
       synchronized (Tablet.this) {
         Set<Path> absFilePaths = scanFileReservations.remove(reservationId);
-        
+
         if (absFilePaths == null)
           throw new IllegalArgumentException("Unknown scan reservation id " + reservationId);
-        
+
         boolean notify = false;
         for (Path path : absFilePaths) {
           long refCount = fileScanReferenceCounts.decrement(path, 1);
@@ -589,32 +589,32 @@ public class Tablet {
           } else if (refCount < 0)
             throw new IllegalStateException("Scan ref count for " + path + " is " + refCount);
         }
-        
+
         if (notify)
           Tablet.this.notifyAll();
       }
-      
+
       if (filesToDelete.size() > 0) {
         log.debug("Removing scan refs from metadata " + extent + " " + abs2rel(filesToDelete));
         MetadataTable.removeScanFiles(extent, abs2rel(filesToDelete), SecurityConstants.getSystemCredentials(), tabletServer.getLock());
       }
     }
-    
+
     private void removeFilesAfterScanRel(Set<String> relPaths) {
       Set<Path> scanFiles = new HashSet<Path>();
-      
+
       for (String rpath : relPaths)
         scanFiles.add(new Path(ServerConstants.getTablesDir() + "/" + extent.getTableId() + rpath));
-      
+
       removeFilesAfterScan(scanFiles);
     }
-    
+
     private void removeFilesAfterScan(Set<Path> scanFiles) {
       if (scanFiles.size() == 0)
         return;
-      
+
       Set<Path> filesToDelete = new HashSet<Path>();
-      
+
       synchronized (Tablet.this) {
         for (Path path : scanFiles) {
           if (fileScanReferenceCounts.get(path) == 0)
@@ -623,26 +623,26 @@ public class Tablet {
             filesToDeleteAfterScan.add(path);
         }
       }
-      
+
       if (filesToDelete.size() > 0) {
         log.debug("Removing scan refs from metadata " + extent + " " + abs2rel(filesToDelete));
         MetadataTable.removeScanFiles(extent, abs2rel(filesToDelete), SecurityConstants.getSystemCredentials(), tabletServer.getLock());
       }
     }
-    
+
     private TreeSet<Path> waitForScansToFinish(Set<Path> pathsToWaitFor, boolean blockNewScans, long maxWaitTime) {
       long startTime = System.currentTimeMillis();
       TreeSet<Path> inUse = new TreeSet<Path>();
-      
+
       Span waitForScans = Trace.start("waitForScans");
       synchronized (Tablet.this) {
         if (blockNewScans) {
           if (reservationsBlocked)
             throw new IllegalStateException();
-          
+
           reservationsBlocked = true;
         }
-        
+
         for (Path path : pathsToWaitFor) {
           while (fileScanReferenceCounts.get(path) > 0 && System.currentTimeMillis() - startTime < maxWaitTime) {
             try {
@@ -652,43 +652,43 @@ public class Tablet {
             }
           }
         }
-        
+
         for (Path path : pathsToWaitFor) {
           if (fileScanReferenceCounts.get(path) > 0)
             inUse.add(path);
         }
-        
+
         if (blockNewScans) {
           reservationsBlocked = false;
           Tablet.this.notifyAll();
         }
-        
+
       }
       waitForScans.stop();
       return inUse;
     }
-    
+
     public void importMapFiles(long tid, Map<String,DataFileValue> pathsString, boolean setTime) throws IOException {
-      
+
       String bulkDir = null;
-      
+
       Map<Path,DataFileValue> paths = new HashMap<Path,MetadataTable.DataFileValue>();
       for (Entry<String,DataFileValue> entry : pathsString.entrySet())
         paths.put(new Path(entry.getKey()), entry.getValue());
-      
+
       for (Path tpath : paths.keySet()) {
-        
+
         if (!tpath.getParent().getParent().equals(new Path(ServerConstants.getTablesDir() + "/" + extent.getTableId()))) {
           throw new IOException("Map file " + tpath + " not in table dir " + ServerConstants.getTablesDir() + "/" + extent.getTableId());
         }
-        
+
         if (bulkDir == null)
           bulkDir = tpath.getParent().toString();
         else if (!bulkDir.equals(tpath.getParent().toString()))
           throw new IllegalArgumentException("bulk files in different dirs " + bulkDir + " " + tpath);
-        
+
       }
-      
+
       if (extent.isRootTablet()) {
         throw new IllegalArgumentException("Can not import files to root tablet");
       }
@@ -706,7 +706,7 @@ public class Tablet {
         for (String file : files)
           if (paths.keySet().remove(new Path(ServerConstants.getTablesDir() + "/" + extent.getTableId() + file)))
             log.debug("Ignoring request to re-import a file already imported: " + extent + ": " + file);
-        
+
         if (paths.size() > 0) {
           long bulkTime = Long.MIN_VALUE;
           if (setTime) {
@@ -722,40 +722,40 @@ public class Tablet {
           synchronized (timeLock) {
             if (bulkTime > persistedTime)
               persistedTime = bulkTime;
-          
+
             MetadataTable.updateTabletDataFile(tid, extent, abs2rel(paths), tabletTime.getMetadataValue(persistedTime), auths, tabletServer.getLock());
           }
         }
       }
-      
+
       synchronized (Tablet.this) {
         for (Entry<Path,DataFileValue> tpath : paths.entrySet()) {
           if (datafileSizes.containsKey(tpath.getKey())) {
             log.error("Adding file that is already in set " + tpath.getKey());
           }
           datafileSizes.put(tpath.getKey(), tpath.getValue());
-          
+
         }
-        
+
         tabletResources.importedMapFiles();
-        
+
         computeNumEntries();
       }
-      
+
       for (Entry<Path,DataFileValue> entry : paths.entrySet()) {
         log.log(TLevel.TABLET_HIST, extent + " import " + abs2rel(entry.getKey()) + " " + entry.getValue());
       }
     }
-    
+
     String reserveMergingMinorCompactionFile() {
       if (mergingMinorCompactionFile != null)
         throw new IllegalStateException("Tried to reserve merging minor compaction file when already reserved  : " + mergingMinorCompactionFile);
-      
+
       if (extent.isRootTablet())
         return null;
-      
+
       int maxFiles = acuTableConf.getMaxFilesPerTablet();
-      
+
       // when a major compaction is running and we are at max files, write out
       // one extra file... want to avoid the case where major compaction is
       // compacting everything except for the largest file, and therefore the
@@ -764,45 +764,45 @@ public class Tablet {
       // are canceled
       if (majorCompactingFiles.size() > 0 && datafileSizes.size() == maxFiles)
         return null;
-      
+
       if (datafileSizes.size() >= maxFiles) {
         // find the smallest file
-        
+
         long min = Long.MAX_VALUE;
         Path minName = null;
-        
+
         for (Entry<Path,DataFileValue> entry : datafileSizes.entrySet()) {
           if (entry.getValue().getSize() < min && !majorCompactingFiles.contains(entry.getKey())) {
             min = entry.getValue().getSize();
             minName = entry.getKey();
           }
         }
-        
+
         if (minName == null)
           return null;
-        
+
         mergingMinorCompactionFile = minName;
         return minName.toString();
       }
-      
+
       return null;
     }
-    
+
     void unreserveMergingMinorCompactionFile(Path file) {
       if ((file == null && mergingMinorCompactionFile != null) || (file != null && mergingMinorCompactionFile == null)
           || (file != null && mergingMinorCompactionFile != null && !file.equals(mergingMinorCompactionFile)))
         throw new IllegalStateException("Disagreement " + file + " " + mergingMinorCompactionFile);
-      
+
       mergingMinorCompactionFile = null;
     }
-    
+
     void bringMinorCompactionOnline(String tmpDatafile, String newDatafile, String absMergeFile, DataFileValue dfv, CommitSession commitSession, long flushId) {
       bringMinorCompactionOnline(new Path(tmpDatafile), new Path(newDatafile), absMergeFile == null ? null : new Path(absMergeFile), dfv, commitSession,
           flushId);
     }
-    
+
     void bringMinorCompactionOnline(Path tmpDatafile, Path newDatafile, Path absMergeFile, DataFileValue dfv, CommitSession commitSession, long flushId) {
-      
+
       IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
       if (extent.isRootTablet()) {
         try {
@@ -813,7 +813,7 @@ public class Tablet {
           throw new IllegalStateException("Can not bring major compaction online, lock not held", e);
         }
       }
-      
+
       // rename before putting in metadata table, so files in metadata table should
       // always exist
       do {
@@ -825,7 +825,7 @@ public class Tablet {
               log.warn("Target map file already exist " + newDatafile);
               fs.delete(newDatafile, true);
             }
-            
+
             rename(fs, tmpDatafile, newDatafile);
           }
           break;
@@ -834,9 +834,9 @@ public class Tablet {
           UtilWaitThread.sleep(60 * 1000);
         }
       } while (true);
-      
+
       long t1, t2;
-      
+
       // the code below always assumes merged files are in use by scans... this must be done
       // because the in memory list of files is not updated until after the metadata table
       // therefore the file is available to scans until memory is updated, but want to ensure
@@ -849,12 +849,12 @@ public class Tablet {
       Set<Path> filesInUseByScans = Collections.emptySet();
       if (absMergeFile != null)
         filesInUseByScans = Collections.singleton(absMergeFile);
-      
+
       // very important to write delete entries outside of log lock, because
       // this !METADATA write does not go up... it goes sideways or to itself
       if (absMergeFile != null)
         MetadataTable.addDeleteEntries(extent, Collections.singleton(abs2rel(absMergeFile)), SecurityConstants.getSystemCredentials());
-      
+
       Set<String> unusedWalLogs = beginClearingUnusedLogs();
       try {
         // the order of writing to !METADATA and walog is important in the face of machine/process failures
@@ -862,25 +862,25 @@ public class Tablet {
         // data could be lost... the minor compaction start even should be written before the following metadata
         // write is made
         TCredentials creds = SecurityConstants.getSystemCredentials();
-        
+
         synchronized (timeLock) {
           if (commitSession.getMaxCommittedTime() > persistedTime)
             persistedTime = commitSession.getMaxCommittedTime();
-          
+
           String time = tabletTime.getMetadataValue(persistedTime);
           MetadataTable.updateTabletDataFile(extent, abs2rel(newDatafile), abs2rel(absMergeFile), dfv, time, creds, abs2rel(filesInUseByScans),
               tabletServer.getClientAddressString(), tabletServer.getLock(), unusedWalLogs, lastLocation, flushId);
         }
-        
+
       } finally {
         finishClearingUnusedLogs();
       }
-      
+
       do {
         try {
           // the purpose of making this update use the new commit session, instead of the old one passed in,
           // is because the new one will reference the logs used by current memory...
-          
+
           tabletServer.minorCompactionFinished(tabletMemory.getCommitSession(), newDatafile.toString(), commitSession.getWALogSeq() + 2);
           break;
         } catch (IOException e) {
@@ -888,37 +888,37 @@ public class Tablet {
           UtilWaitThread.sleep(1 * 1000);
         }
       } while (true);
-      
+
       synchronized (Tablet.this) {
         lastLocation = null;
-        
+
         t1 = System.currentTimeMillis();
         if (datafileSizes.containsKey(newDatafile)) {
           log.error("Adding file that is already in set " + newDatafile);
         }
-        
+
         if (dfv.getNumEntries() > 0) {
           datafileSizes.put(newDatafile, dfv);
         }
-        
+
         if (absMergeFile != null) {
           datafileSizes.remove(absMergeFile);
         }
-        
+
         unreserveMergingMinorCompactionFile(absMergeFile);
-        
+
         dataSourceDeletions.incrementAndGet();
         tabletMemory.finishedMinC();
-        
+
         lastFlushID = flushId;
-        
+
         computeNumEntries();
         t2 = System.currentTimeMillis();
       }
-      
+
       // must do this after list of files in memory is updated above
       removeFilesAfterScan(filesInUseByScans);
-      
+
       if (absMergeFile != null)
         log.log(TLevel.TABLET_HIST, extent + " MinC [" + abs2rel(absMergeFile) + ",memory] -> " + abs2rel(newDatafile));
       else
@@ -928,95 +928,95 @@ public class Tablet {
         log.debug(String.format("Minor Compaction wrote out file larger than split threshold.  split threshold = %,d  file size = %,d",
             acuTableConf.getMemoryInBytes(Property.TABLE_SPLIT_THRESHOLD), dfv.getSize()));
       }
-      
+
     }
-    
+
     private Map<String,DataFileValue> abs2rel(Map<Path,DataFileValue> paths) {
       TreeMap<String,DataFileValue> relMap = new TreeMap<String,MetadataTable.DataFileValue>();
-      
+
       for (Entry<Path,DataFileValue> entry : paths.entrySet())
         relMap.put(abs2rel(entry.getKey()), entry.getValue());
-      
+
       return relMap;
     }
-    
+
     private Set<String> abs2rel(Set<Path> absPaths) {
       Set<String> relativePaths = new TreeSet<String>();
       for (Path absPath : absPaths)
         relativePaths.add(abs2rel(absPath));
-      
+
       return relativePaths;
     }
-    
+
     private Set<Path> string2path(Set<String> strings) {
       Set<Path> paths = new HashSet<Path>();
       for (String path : strings)
         paths.add(new Path(path));
-      
+
       return paths;
     }
-    
+
     private String abs2rel(Path absPath) {
       if (absPath == null)
         return null;
-      
+
       if (absPath.getParent().getParent().getName().equals(extent.getTableId().toString()))
         return "/" + absPath.getParent().getName() + "/" + absPath.getName();
       else
         return "../" + absPath.getParent().getParent().getName() + "/" + absPath.getParent().getName() + "/" + absPath.getName();
     }
-    
+
     public void reserveMajorCompactingFiles(Set<String> files) {
       if (majorCompactingFiles.size() != 0)
         throw new IllegalStateException("Major compacting files not empty " + majorCompactingFiles);
-      
+
       Set<Path> mcf = string2path(files);
       if (mergingMinorCompactionFile != null && mcf.contains(mergingMinorCompactionFile))
         throw new IllegalStateException("Major compaction tried to resrve file in use by minor compaction " + mergingMinorCompactionFile);
-      
+
       majorCompactingFiles.addAll(mcf);
     }
-    
+
     public void clearMajorCompactingFile() {
       majorCompactingFiles.clear();
     }
-    
+
     void bringMajorCompactionOnline(Set<String> oldDatafiles, String tmpDatafile, String newDatafile, Long compactionId, DataFileValue dfv) throws IOException {
       bringMajorCompactionOnline(string2path(oldDatafiles), new Path(tmpDatafile), new Path(newDatafile), compactionId, dfv);
     }
-    
+
     void bringMajorCompactionOnline(Set<Path> oldDatafiles, Path tmpDatafile, Path newDatafile, Long compactionId, DataFileValue dfv) throws IOException {
       long t1, t2;
-      
+
       if (!extent.isRootTablet()) {
-        
+
         if (fs.exists(newDatafile)) {
           log.error("Target map file already exist " + newDatafile, new Exception());
           throw new IllegalStateException("Target map file already exist " + newDatafile);
         }
-        
+
         // rename before putting in metadata table, so files in metadata table should
         // always exist
         rename(fs, tmpDatafile, newDatafile);
-        
+
         if (dfv.getNumEntries() == 0) {
           fs.delete(newDatafile, true);
         }
       }
-      
+
       TServerInstance lastLocation = null;
       synchronized (Tablet.this) {
-        
+
         t1 = System.currentTimeMillis();
-        
+
         IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
-        
+
         dataSourceDeletions.incrementAndGet();
-        
+
         if (extent.isRootTablet()) {
-          
+
           waitForScansToFinish(oldDatafiles, true, Long.MAX_VALUE);
-          
+
           try {
             if (!zoo.isLockHeld(tabletServer.getLock().getLockID())) {
               throw new IllegalStateException();
@@ -1024,25 +1024,25 @@ public class Tablet {
           } catch (Exception e) {
             throw new IllegalStateException("Can not bring major compaction online, lock not held", e);
           }
-          
+
           // mark files as ready for deletion, but
           // do not delete them until we successfully
           // rename the compacted map file, in case
           // the system goes down
-          
+
           String compactName = newDatafile.getName();
-          
+
           for (Path path : oldDatafiles) {
             rename(fs, path, new Path(location + "/delete+" + compactName + "+" + path.getName()));
           }
-          
+
           if (fs.exists(newDatafile)) {
             log.error("Target map file already exist " + newDatafile, new Exception());
             throw new IllegalStateException("Target map file already exist " + newDatafile);
           }
-          
+
           rename(fs, tmpDatafile, newDatafile);
-          
+
           // start deleting files, if we do not finish they will be cleaned
           // up later
           Trash trash = new Trash(fs, fs.getConf());
@@ -1052,7 +1052,7 @@ public class Tablet {
               fs.delete(deleteFile, true);
           }
         }
-        
+
         // atomically remove old files and add new file
         for (Path oldDatafile : oldDatafiles) {
           if (!datafileSizes.containsKey(oldDatafile)) {
@@ -1061,29 +1061,29 @@ public class Tablet {
           datafileSizes.remove(oldDatafile);
           majorCompactingFiles.remove(oldDatafile);
         }
-        
+
         if (datafileSizes.containsKey(newDatafile)) {
           log.error("Adding file that is already in set " + newDatafile);
         }
-        
+
         if (dfv.getNumEntries() > 0) {
           datafileSizes.put(newDatafile, dfv);
         }
-        
+
         // could be used by a follow on compaction in a multipass compaction
         majorCompactingFiles.add(newDatafile);
-        
+
         computeNumEntries();
-        
+
         lastLocation = Tablet.this.lastLocation;
         Tablet.this.lastLocation = null;
-        
+
         if (compactionId != null)
           lastCompactID = compactionId;
-        
+
         t2 = System.currentTimeMillis();
       }
-      
+
       if (!extent.isRootTablet()) {
         Set<Path> filesInUseByScans = waitForScansToFinish(oldDatafiles, false, 10000);
         if (filesInUseByScans.size() > 0)
@@ -1092,37 +1092,37 @@ public class Tablet {
             SecurityConstants.getSystemCredentials(), tabletServer.getClientAddressString(), lastLocation, tabletServer.getLock());
         removeFilesAfterScan(filesInUseByScans);
       }
-      
+
       log.debug(String.format("MajC finish lock %.2f secs", (t2 - t1) / 1000.0));
       log.log(TLevel.TABLET_HIST, extent + " MajC " + abs2rel(oldDatafiles) + " --> " + abs2rel(newDatafile));
     }
-    
+
     public SortedMap<String,DataFileValue> getDatafileSizesRel() {
       synchronized (Tablet.this) {
         TreeMap<String,DataFileValue> files = new TreeMap<String,MetadataTable.DataFileValue>();
         Set<Entry<Path,DataFileValue>> es = datafileSizes.entrySet();
-        
+
         for (Entry<Path,DataFileValue> entry : es) {
           files.put(abs2rel(entry.getKey()), entry.getValue());
         }
-        
+
         return Collections.unmodifiableSortedMap(files);
       }
     }
-    
+
     public SortedMap<String,DataFileValue> getDatafileSizes() {
       synchronized (Tablet.this) {
         TreeMap<String,DataFileValue> files = new TreeMap<String,MetadataTable.DataFileValue>();
         Set<Entry<Path,DataFileValue>> es = datafileSizes.entrySet();
-        
+
         for (Entry<Path,DataFileValue> entry : es) {
           files.put(entry.getKey().toString(), entry.getValue());
         }
-        
+
         return Collections.unmodifiableSortedMap(files);
       }
     }
-    
+
     public Set<String> getFiles() {
       synchronized (Tablet.this) {
         HashSet<String> files = new HashSet<String>();
@@ -1132,38 +1132,38 @@ public class Tablet {
         return Collections.unmodifiableSet(files);
       }
     }
-    
+
   }
-  
+
   public Tablet(TabletServer tabletServer, Text location, KeyExtent extent, TabletResourceManager trm, SortedMap<Key,Value> tabletsKeyValues)
       throws IOException {
     this(tabletServer, location, extent, trm, CachedConfiguration.getInstance(), tabletsKeyValues);
     splitCreationTime = 0;
   }
-  
+
   public Tablet(TabletServer tabletServer, Text location, KeyExtent extent, TabletResourceManager trm, SortedMap<String,DataFileValue> datafiles, String time,
       long initFlushID, long initCompactID) throws IOException {
     this(tabletServer, location, extent, trm, CachedConfiguration.getInstance(), datafiles, time, initFlushID, initCompactID);
     splitCreationTime = System.currentTimeMillis();
   }
-  
+
   private Tablet(TabletServer tabletServer, Text location, KeyExtent extent, TabletResourceManager trm, Configuration conf,
       SortedMap<Key,Value> tabletsKeyValues) throws IOException {
     this(tabletServer, location, extent, trm, conf, TraceFileSystem.wrap(FileUtil.getFileSystem(conf, ServerConfiguration.getSiteConfiguration())),
         tabletsKeyValues);
   }
-  
+
   static private final List<LogEntry> EMPTY = Collections.emptyList();
-  
+
   private Tablet(TabletServer tabletServer, Text location, KeyExtent extent, TabletResourceManager trm, Configuration conf,
       SortedMap<String,DataFileValue> datafiles, String time, long initFlushID, long initCompactID) throws IOException {
     this(tabletServer, location, extent, trm, conf, TraceFileSystem.wrap(FileUtil.getFileSystem(conf, ServerConfiguration.getSiteConfiguration())), EMPTY,
         datafiles, time, null, new HashSet<String>(), initFlushID, initCompactID);
   }
-  
+
   private static String lookupTime(AccumuloConfiguration conf, KeyExtent extent, SortedMap<Key,Value> tabletsKeyValues) {
     SortedMap<Key,Value> entries;
-    
+
     if (extent.isRootTablet()) {
       return null;
     } else {
@@ -1175,20 +1175,20 @@ public class Tablet {
         }
       }
     }
-    
+
     // log.debug("extent : "+extent+"   entries : "+entries);
-    
+
     if (entries.size() == 1)
       return entries.values().iterator().next().toString();
     return null;
   }
-  
+
   private static SortedMap<String,DataFileValue> lookupDatafiles(AccumuloConfiguration conf, Text locText, FileSystem fs, KeyExtent extent,
       SortedMap<Key,Value> tabletsKeyValues) throws IOException {
     Path location = new Path(ServerConstants.getTablesDir() + "/" + extent.getTableId().toString() + locText.toString());
-    
+
     TreeMap<String,DataFileValue> datafiles = new TreeMap<String,DataFileValue>();
-    
+
     if (extent.isRootTablet()) { // the meta0 tablet
       // cleanUpFiles() has special handling for delete. files
       FileStatus[] files = fs.listStatus(location);
@@ -1203,49 +1203,49 @@ public class Tablet {
         datafiles.put(locText.toString() + "/" + filename, dfv);
       }
     } else {
-      
+
       SortedMap<Key,Value> datafilesMetadata;
-      
+
       Text rowName = extent.getMetadataEntry();
-        
+
       ScannerImpl mdScanner = new ScannerImpl(HdfsZooInstance.getInstance(), SecurityConstants.getSystemCredentials(), Constants.METADATA_TABLE_ID,
           Constants.NO_AUTHS);
-      
+
       // Commented out because when no data file is present, each tablet will scan through metadata table and return nothing
       // reduced batch size to improve performance
       // changed here after endKeys were implemented from 10 to 1000
       mdScanner.setBatchSize(1000);
-      
+
       // leave these in, again, now using endKey for safety
       mdScanner.fetchColumnFamily(Constants.METADATA_DATAFILE_COLUMN_FAMILY);
-      
+
       mdScanner.setRange(new Range(rowName));
-      
+
       datafilesMetadata = new TreeMap<Key,Value>();
-      
+
       for (Entry<Key,Value> entry : mdScanner) {
-        
+
         if (entry.getKey().compareRow(rowName) != 0) {
           break;
         }
-        
+
         datafilesMetadata.put(new Key(entry.getKey()), new Value(entry.getValue()));
       }
-      
+
       Iterator<Entry<Key,Value>> dfmdIter = datafilesMetadata.entrySet().iterator();
-      
+
       while (dfmdIter.hasNext()) {
         Entry<Key,Value> entry = dfmdIter.next();
-        
+
         datafiles.put(entry.getKey().getColumnQualifier().toString(), new DataFileValue(entry.getValue().get()));
       }
     }
     return datafiles;
   }
-  
+
   private static List<LogEntry> lookupLogEntries(KeyExtent ke, SortedMap<Key,Value> tabletsKeyValues) {
     List<LogEntry> logEntries = new ArrayList<LogEntry>();
-    
+
     if (ke.isMeta()) {
       try {
         logEntries = MetadataTable.getLogEntries(SecurityConstants.getSystemCredentials(), ke);
@@ -1264,14 +1264,14 @@ public class Tablet {
         }
       }
     }
-    
+
     log.debug("got " + logEntries + " for logs for " + ke);
     return logEntries;
   }
-  
+
   private static Set<String> lookupScanFiles(KeyExtent extent, SortedMap<Key,Value> tabletsKeyValues) {
     HashSet<String> scanFiles = new HashSet<String>();
-    
+
     Text row = extent.getMetadataEntry();
     for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
       Key key = entry.getKey();
@@ -1279,10 +1279,10 @@ public class Tablet {
         scanFiles.add(key.getColumnQualifier().toString());
       }
     }
-    
+
     return scanFiles;
   }
-  
+
   private static long lookupFlushID(KeyExtent extent, SortedMap<Key,Value> tabletsKeyValues) {
     Text row = extent.getMetadataEntry();
     for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
@@ -1290,10 +1290,10 @@ public class Tablet {
       if (key.getRow().equals(row) && Constants.METADATA_FLUSH_COLUMN.equals(key.getColumnFamily(), key.getColumnQualifier()))
         return Long.parseLong(entry.getValue().toString());
     }
-    
+
     return -1;
   }
-  
+
   private static long lookupCompactID(KeyExtent extent, SortedMap<Key,Value> tabletsKeyValues) {
     Text row = extent.getMetadataEntry();
     for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
@@ -1301,17 +1301,17 @@ public class Tablet {
       if (key.getRow().equals(row) && Constants.METADATA_COMPACT_COLUMN.equals(key.getColumnFamily(), key.getColumnQualifier()))
         return Long.parseLong(entry.getValue().toString());
     }
-    
+
     return -1;
   }
-  
+
   private Tablet(TabletServer tabletServer, Text location, KeyExtent extent, TabletResourceManager trm, Configuration conf, FileSystem fs,
       SortedMap<Key,Value> tabletsKeyValues) throws IOException {
     this(tabletServer, location, extent, trm, conf, fs, lookupLogEntries(extent, tabletsKeyValues), lookupDatafiles(tabletServer.getSystemConfiguration(),
         location, fs, extent, tabletsKeyValues), lookupTime(tabletServer.getSystemConfiguration(), extent, tabletsKeyValues), lookupLastServer(extent,
         tabletsKeyValues), lookupScanFiles(extent, tabletsKeyValues), lookupFlushID(extent, tabletsKeyValues), lookupCompactID(extent, tabletsKeyValues));
   }
-  
+
   private static TServerInstance lookupLastServer(KeyExtent extent, SortedMap<Key,Value> tabletsKeyValues) {
     for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
       if (entry.getKey().getColumnFamily().compareTo(Constants.METADATA_LAST_LOCATION_COLUMN_FAMILY) == 0) {
@@ -1320,7 +1320,7 @@ public class Tablet {
     }
     return null;
   }
-  
+
   /**
    * yet another constructor - this one allows us to avoid costly lookups into the Metadata table if we already know the files we need - as at split time
    */
@@ -1332,68 +1332,70 @@ public class Tablet {
     this.tabletDirectory = location.toString();
     this.conf = conf;
     this.acuTableConf = tabletServer.getTableConfiguration(extent);
-    
+
     this.fs = fs;
     this.extent = extent;
     this.tabletResources = trm;
-    
+
     this.lastFlushID = initFlushID;
     this.lastCompactID = initCompactID;
-    
+
     if (extent.isRootTablet()) {
-      
+
       long rtime = Long.MIN_VALUE;
       for (String path : datafiles.keySet()) {
         String filename = new Path(path).getName();
-        
+
         FileSKVIterator reader = FileOperations.getInstance().openReader(this.location + "/" + filename, true, fs, fs.getConf(),
             tabletServer.getTableConfiguration(extent));
         long maxTime = -1;
         try {
-          
+
           while (reader.hasTop()) {
             maxTime = Math.max(maxTime, reader.getTopKey().getTimestamp());
             reader.next();
           }
-          
+
         } finally {
           reader.close();
         }
-        
+
         if (maxTime > rtime) {
           time = TabletTime.LOGICAL_TIME_ID + "" + maxTime;
           rtime = maxTime;
         }
       }
     }
-    
+
     this.tabletServer = tabletServer;
     this.logId = tabletServer.createLogId(extent);
-    
+
     this.timer = new TabletStatsKeeper();
-    
+
     setupDefaultSecurityLabels(extent);
-    
+
     tabletMemory = new TabletMemory();
     tabletTime = TabletTime.getInstance(time);
     persistedTime = tabletTime.getTime();
-    
+
     acuTableConf.addObserver(configObserver = new ConfigurationObserver() {
-      
+
       private void reloadConstraints() {
         constraintChecker.set(new ConstraintChecker(getTableConfiguration()));
       }
-      
+
+      @Override
       public void propertiesChanged() {
         reloadConstraints();
-        
+
         try {
           setupDefaultSecurityLabels(extent);
         } catch (Exception e) {
           log.error("Failed to reload default security labels for extent: " + extent.toString());
         }
       }
-      
+
+      @Override
       public void propertyChanged(String prop) {
         if (prop.startsWith(Property.TABLE_CONSTRAINT_PREFIX.getKey()))
           reloadConstraints();
@@ -1405,19 +1407,20 @@ public class Tablet {
             log.error("Failed to reload default security labels for extent: " + extent.toString());
           }
         }
-        
+
       }
-      
+
+      @Override
       public void sessionExpired() {
         log.debug("Session expired, no longer updating per table props...");
       }
-      
+
     });
     // Force a load of any per-table properties
     configObserver.propertiesChanged();
-    
+
     tabletResources.setTablet(this, acuTableConf);
-    
+
     if (!logEntries.isEmpty()) {
       log.info("Starting Write-Ahead Log recovery for " + this.extent);
       final long[] count = new long[2];
@@ -1427,8 +1430,9 @@ public class Tablet {
         Set<String> absPaths = new HashSet<String>();
         for (String relPath : datafiles.keySet())
           absPaths.add(rel2abs(relPath, extent));
-        
+
         tabletServer.recover(this, logEntries, absPaths, new MutationReceiver() {
+          @Override
           public void receive(Mutation m) {
             // LogReader.printMutation(m);
             Collection<ColumnUpdate> muts = m.getUpdates();
@@ -1443,17 +1447,17 @@ public class Tablet {
             count[0]++;
           }
         });
-        
+
         if (count[1] != Long.MIN_VALUE) {
           tabletTime.useMaxTimeFromWALog(count[1]);
         }
         commitSession.updateMaxCommittedTime(tabletTime.getTime());
-        
+
         if (count[0] == 0) {
           MetadataTable.removeUnusedWALEntries(extent, logEntries, tabletServer.getLock());
           logEntries.clear();
         }
-        
+
       } catch (Throwable t) {
         if (acuTableConf.getBoolean(Property.TABLE_FAILURES_IGNORE)) {
           log.warn("Error recovering from log files: ", t);
@@ -1469,11 +1473,11 @@ public class Tablet {
           currentLogs.add(new DfsLogger(tabletServer.getServerConfig(), logEntry.server, parts[1]));
         }
       }
-      
+
       log.info("Write-Ahead Log recovery complete for " + this.extent + " (" + count[0] + " mutations applied, " + tabletMemory.getNumEntries()
           + " entries created)");
     }
-    
+
     String contextName = acuTableConf.get(Property.TABLE_CLASSPATH);
     if (contextName != null && !contextName.equals("")) {
       // initialize context classloader, instead of possibly waiting for it to initialize for a scan
@@ -1484,11 +1488,11 @@ public class Tablet {
     // do this last after tablet is completely setup because it
     // could cause major compaction to start
     datafileManager = new DatafileManager(datafiles);
-    
+
     computeNumEntries();
-    
+
     datafileManager.removeFilesAfterScanRel(scanFiles);
-    
+
     // look for hints of a failure on the previous tablet server
     if (!logEntries.isEmpty() || needsMajorCompaction(MajorCompactionReason.NORMAL)) {
       // look for any temp files hanging around
@@ -1500,10 +1504,10 @@ public class Tablet {
       // look for any temp files hanging around
       removeOldTemporaryFiles();
     }
-    
+
     log.log(TLevel.TABLET_HIST, extent + " opened ");
   }
-  
+
   private void removeOldTemporaryFiles() {
     // remove any temporary files created by a previous tablet server
     try {
@@ -1532,18 +1536,18 @@ public class Tablet {
       }
     }
   }
-  
+
   private static Collection<String> cleanUpFiles(FileSystem fs, FileStatus[] files, Path location, boolean deleteTmp) throws IOException {
     /*
      * called in constructor and before major compactions
      */
     Collection<String> goodFiles = new ArrayList<String>(files.length);
-    
+
     for (FileStatus file : files) {
-      
+
       String path = file.getPath().toString();
       String filename = file.getPath().getName();
-      
+
       // check for incomplete major compaction, this should only occur
       // for root tablet
       if (filename.startsWith("delete+")) {
@@ -1555,35 +1559,35 @@ public class Tablet {
           continue;
         }
         // compaction did not finish, so put files back
-        
+
         // reset path and filename for rest of loop
         filename = filename.split("\\+", 3)[2];
         path = location + "/" + filename;
-        
+
         rename(fs, file.getPath(), new Path(path));
       }
-      
+
       if (filename.endsWith("_tmp")) {
         if (deleteTmp) {
           log.warn("cleaning up old tmp file: " + path);
           if (!fs.delete(file.getPath(), true))
             log.warn("Delete of tmp file: " + file.getPath().toString() + " return false");
-          
+
         }
         continue;
       }
-      
+
       if (!filename.startsWith(Constants.MAPFILE_EXTENSION + "_") && !FileOperations.getValidExtensions().contains(filename.split("\\.")[1])) {
         log.error("unknown file in tablet: " + path);
         continue;
       }
-      
+
       goodFiles.add(path);
     }
-    
+
     return goodFiles;
   }
-  
+
   public static class KVEntry extends KeyValue {
     private static final long serialVersionUID = 1L;
 
@@ -1594,58 +1598,58 @@ public class Tablet {
     int numBytes() {
       return getKey().getSize() + getValue().get().length;
     }
-    
+
     int estimateMemoryUsed() {
       return getKey().getSize() + getValue().get().length + (9 * 32); // overhead is 32 per object
     }
   }
-  
+
   private LookupResult lookup(SortedKeyValueIterator<Key,Value> mmfi, List<Range> ranges, HashSet<Column> columnSet, ArrayList<KVEntry> results,
       long maxResultsSize) throws IOException {
-    
+
     LookupResult lookupResult = new LookupResult();
-    
+
     boolean exceededMemoryUsage = false;
     boolean tabletClosed = false;
-    
+
     Set<ByteSequence> cfset = null;
     if (columnSet.size() > 0)
       cfset = LocalityGroupUtil.families(columnSet);
-    
+
     for (Range range : ranges) {
-      
+
       if (exceededMemoryUsage || tabletClosed) {
         lookupResult.unfinishedRanges.add(range);
         continue;
       }
-      
+
       int entriesAdded = 0;
-      
+
       try {
         if (cfset != null)
           mmfi.seek(range, cfset, true);
         else
           mmfi.seek(range, LocalityGroupUtil.EMPTY_CF_SET, false);
-        
+
         while (mmfi.hasTop()) {
           Key key = mmfi.getTopKey();
-          
+
           KVEntry kve = new KVEntry(key, mmfi.getTopValue());
           results.add(kve);
           entriesAdded++;
           lookupResult.bytesAdded += kve.estimateMemoryUsed();
           lookupResult.dataSize += kve.numBytes();
-          
+
           exceededMemoryUsage = lookupResult.bytesAdded > maxResultsSize;
-          
+
           if (exceededMemoryUsage) {
             addUnfinishedRange(lookupResult, range, key, false);
             break;
           }
-          
+
           mmfi.next();
         }
-        
+
       } catch (TooManyFilesException tmfe) {
         // treat this as a closed tablet, and let the client retry
         log.warn("Tablet " + getExtent() + " has too many files, batch lookup can not run");
@@ -1671,61 +1675,61 @@ public class Tablet {
         handleTabletClosedDuringScan(results, lookupResult, exceededMemoryUsage, range, entriesAdded);
         tabletClosed = true;
       }
-      
+
     }
-    
+
     return lookupResult;
   }
-  
+
   private void handleTabletClosedDuringScan(ArrayList<KVEntry> results, LookupResult lookupResult, boolean exceededMemoryUsage, Range range, int entriesAdded) {
     if (exceededMemoryUsage)
       throw new IllegalStateException("tablet should not exceed memory usage or close, not both");
-    
+
     if (entriesAdded > 0)
       addUnfinishedRange(lookupResult, range, results.get(results.size() - 1).getKey(), false);
     else
       lookupResult.unfinishedRanges.add(range);
-    
+
     lookupResult.closed = true;
   }
-  
+
   private void addUnfinishedRange(LookupResult lookupResult, Range range, Key key, boolean inclusiveStartKey) {
     if (range.getEndKey() == null || key.compareTo(range.getEndKey()) < 0) {
       Range nlur = new Range(new Key(key), inclusiveStartKey, range.getEndKey(), range.isEndKeyInclusive());
       lookupResult.unfinishedRanges.add(nlur);
     }
   }
-  
+
   public static interface KVReceiver {
     void receive(List<KVEntry> matches) throws IOException;
   }
-  
+
   class LookupResult {
     List<Range> unfinishedRanges = new ArrayList<Range>();
     long bytesAdded = 0;
     long dataSize = 0;
     boolean closed = false;
   }
-  
+
   public LookupResult lookup(List<Range> ranges, HashSet<Column> columns, Authorizations authorizations, ArrayList<KVEntry> results, long maxResultSize,
       List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, AtomicBoolean interruptFlag) throws IOException {
-    
+
     if (ranges.size() == 0) {
       return new LookupResult();
     }
-    
+
     ranges = Range.mergeOverlapping(ranges);
     Collections.sort(ranges);
-    
+
     Range tabletRange = extent.toDataRange();
     for (Range range : ranges) {
       // do a test to see if this range falls within the tablet, if it does not
       // then clip will throw an exception
       tabletRange.clip(range);
     }
-    
+
     ScanDataSource dataSource = new ScanDataSource(authorizations, this.defaultSecurityLabel, columns, ssiList, ssio, interruptFlag);
-    
+
     LookupResult result = null;
 
     try {
@@ -1739,7 +1743,7 @@ public class Tablet {
       // code in finally block because always want
       // to return mapfiles, even when exception is thrown
       dataSource.close(false);
-      
+
       synchronized (this) {
         queryCount += results.size();
         if (result != null)
@@ -1747,74 +1751,74 @@ public class Tablet {
       }
     }
   }
-  
+
   private Batch nextBatch(SortedKeyValueIterator<Key,Value> iter, Range range, int num, HashSet<Column> columns) throws IOException {
-    
+
     // log.info("In nextBatch..");
-    
+
     List<KVEntry> results = new ArrayList<KVEntry>();
     Key key = null;
-    
+
     Value value;
     long resultSize = 0L;
     long resultBytes = 0L;
-    
+
     long maxResultsSize = acuTableConf.getMemoryInBytes(Property.TABLE_SCAN_MAXMEM);
-    
+
     if (columns.size() == 0) {
       iter.seek(range, LocalityGroupUtil.EMPTY_CF_SET, false);
     } else {
       iter.seek(range, LocalityGroupUtil.families(columns), true);
     }
-    
+
     Key continueKey = null;
     boolean skipContinueKey = false;
-    
+
     boolean endOfTabletReached = false;
     while (iter.hasTop()) {
-      
-      value = (Value) iter.getTopValue();
+
+      value = iter.getTopValue();
       key = iter.getTopKey();
-      
+
       KVEntry kvEntry = new KVEntry(key, value); // copies key and value
       results.add(kvEntry);
       resultSize += kvEntry.estimateMemoryUsed();
       resultBytes += kvEntry.numBytes();
-      
+
       if (resultSize >= maxResultsSize || results.size() >= num) {
         continueKey = new Key(key);
         skipContinueKey = true;
         break;
       }
-      
+
       iter.next();
     }
-    
+
     if (iter.hasTop() == false) {
       endOfTabletReached = true;
     }
-    
+
     Batch retBatch = new Batch();
     retBatch.numBytes = resultBytes;
-    
+
     if (!endOfTabletReached) {
       retBatch.continueKey = continueKey;
       retBatch.skipContinueKey = skipContinueKey;
     } else {
       retBatch.continueKey = null;
     }
-    
+
     if (endOfTabletReached && results.size() == 0)
       retBatch.results = null;
     else
       retBatch.results = results;
-    
+
     return retBatch;
   }
-  
+
   /**
    * Determine if a JVM shutdown is in progress.
-   * 
+   *
    */
   private boolean shutdownInProgress() {
     try {
@@ -1825,63 +1829,63 @@ public class Tablet {
     } catch (IllegalStateException ise) {
       return true;
     }
-    
+
     return false;
   }
-  
+
   private class Batch {
     public boolean skipContinueKey;
     public List<KVEntry> results;
     public Key continueKey;
     public long numBytes;
   }
-  
+
   Scanner createScanner(Range range, int num, HashSet<Column> columns, Authorizations authorizations, List<IterInfo> ssiList,
       Map<String,Map<String,String>> ssio, boolean isolated, AtomicBoolean interruptFlag) {
     // do a test to see if this range falls within the tablet, if it does not
     // then clip will throw an exception
     extent.toDataRange().clip(range);
-    
+
     ScanOptions opts = new ScanOptions(num, authorizations, this.defaultSecurityLabel, columns, ssiList, ssio, interruptFlag, isolated);
     return new Scanner(range, opts);
   }
-  
+
   class ScanBatch {
     boolean more;
     List<KVEntry> results;
-    
+
     ScanBatch(List<KVEntry> results, boolean more) {
       this.results = results;
       this.more = more;
     }
   }
-  
+
   class Scanner {
-    
+
     private ScanOptions options;
     private Range range;
     private SortedKeyValueIterator<Key,Value> isolatedIter;
     private ScanDataSource isolatedDataSource;
     private boolean sawException = false;
     private boolean scanClosed = false;
-    
+
     Scanner(Range range, ScanOptions options) {
       this.range = range;
       this.options = options;
     }
-    
+
     synchronized ScanBatch read() throws IOException, TabletClosedException {
-      
+
       if (sawException)
         throw new IllegalStateException("Tried to use scanner after exception occurred.");
-      
+
       if (scanClosed)
         throw new IllegalStateException("Tried to use scanner after it was closed.");
-      
+
       Batch results = null;
-      
+
       ScanDataSource dataSource;
-      
+
       if (options.isolated) {
         if (isolatedDataSource == null)
           isolatedDataSource = new ScanDataSource(options);
@@ -1889,11 +1893,11 @@ public class Tablet {
       } else {
         dataSource = new ScanDataSource(options);
       }
-      
+
       try {
-        
+
         SortedKeyValueIterator<Key,Value> iter;
-        
+
         if (options.isolated) {
           if (isolatedIter == null)
             isolatedIter = new SourceSwitchingIterator(dataSource, true);
@@ -1903,9 +1907,9 @@ public class Tablet {
         } else {
           iter = new SourceSwitchingIterator(dataSource, false);
         }
-        
+
         results = nextBatch(iter, range, options.num, options.columnSet);
-        
+
         if (results.results == null) {
           range = null;
           return new ScanBatch(new ArrayList<Tablet.KVEntry>(), false);
@@ -1915,7 +1919,7 @@ public class Tablet {
           range = new Range(results.continueKey, !results.skipContinueKey, range.getEndKey(), range.isEndKeyInclusive());
           return new ScanBatch(results.results, true);
         }
-        
+
       } catch (IterationInterruptedException iie) {
         sawException = true;
         if (isClosed())
@@ -1927,7 +1931,7 @@ public class Tablet {
           log.debug("IOException while shutdown in progress ", ioe);
           throw new TabletClosedException(ioe); // assume IOException was caused by execution of HDFS shutdown hook
         }
-        
+
         sawException = true;
         dataSource.close(true);
         throw ioe;
@@ -1941,7 +1945,7 @@ public class Tablet {
           dataSource.close(false);
         else if (dataSource.fileManager != null)
           dataSource.fileManager.detach();
-        
+
         synchronized (Tablet.this) {
           if (results != null && results.results != null) {
             long more = results.results.size();
@@ -1951,7 +1955,7 @@ public class Tablet {
         }
       }
     }
-    
+
     // close and read are synchronized because can not call close on the data source while it is in use
     // this cloud lead to the case where file iterators that are in use by a thread are returned
     // to the pool... this would be bad
@@ -1964,9 +1968,9 @@ public class Tablet {
       }
     }
   }
-  
+
   static class ScanOptions {
-    
+
     // scan options
     Authorizations authorizations;
     byte[] defaultLabels;
@@ -1976,7 +1980,7 @@ public class Tablet {
     AtomicBoolean interruptFlag;
     int num;
     boolean isolated;
-    
+
     ScanOptions(int num, Authorizations authorizations, byte[] defaultLabels, HashSet<Column> columnSet, List<IterInfo> ssiList,
         Map<String,Map<String,String>> ssio, AtomicBoolean interruptFlag, boolean isolated) {
       this.num = num;
@@ -1988,11 +1992,11 @@ public class Tablet {
       this.interruptFlag = interruptFlag;
       this.isolated = isolated;
     }
-    
+
   }
-  
+
   class ScanDataSource implements DataSource {
-    
+
     // data source state
     private ScanFileManager fileManager;
     private SortedKeyValueIterator<Key,Value> iter;
@@ -2001,22 +2005,22 @@ public class Tablet {
     private long fileReservationId;
     private AtomicBoolean interruptFlag;
     private StatsIterator statsIterator;
-    
+
     ScanOptions options;
-    
+
     ScanDataSource(Authorizations authorizations, byte[] defaultLabels, HashSet<Column> columnSet, List<IterInfo> ssiList, Map<String,Map<String,String>> ssio,
         AtomicBoolean interruptFlag) {
       expectedDeletionCount = dataSourceDeletions.get();
       this.options = new ScanOptions(-1, authorizations, defaultLabels, columnSet, ssiList, ssio, interruptFlag, false);
       this.interruptFlag = interruptFlag;
     }
-    
+
     ScanDataSource(ScanOptions options) {
       expectedDeletionCount = dataSourceDeletions.get();
       this.options = options;
       this.interruptFlag = options.interruptFlag;
     }
-    
+
     @Override
     public DataSource getNewDataSource() {
       if (!isCurrent()) {
@@ -2027,122 +2031,122 @@ public class Tablet {
           datafileManager.returnFilesForScan(fileReservationId);
           fileReservationId = -1;
         }
-        
+
         if (fileManager != null)
           fileManager.releaseOpenFiles(false);
-        
+
         expectedDeletionCount = dataSourceDeletions.get();
         iter = null;
-        
+
         return this;
       } else
         return this;
     }
-    
+
     @Override
     public boolean isCurrent() {
       return expectedDeletionCount == dataSourceDeletions.get();
     }
-    
+
     @Override
     public SortedKeyValueIterator<Key,Value> iterator() throws IOException {
       if (iter == null)
         iter = createIterator();
       return iter;
     }
-    
+
     private SortedKeyValueIterator<Key,Value> createIterator() throws IOException {
-      
+
       Map<String,DataFileValue> files;
-      
+
       synchronized (Tablet.this) {
-        
+
         if (memIters != null)
           throw new IllegalStateException("Tried to create new scan iterator w/o releasing memory");
-        
+
         if (Tablet.this.closed)
           throw new TabletClosedException();
-        
+
         if (interruptFlag.get())
           throw new IterationInterruptedException(extent.toString() + " " + interruptFlag.hashCode());
-        
+
         // only acquire the file manager when we know the tablet is open
         if (fileManager == null) {
           fileManager = tabletResources.newScanFileManager();
           activeScans.add(this);
         }
-        
+
         if (fileManager.getNumOpenFiles() != 0)
           throw new IllegalStateException("Tried to create new scan iterator w/o releasing files");
-        
+
         // set this before trying to get iterators in case
         // getIterators() throws an exception
         expectedDeletionCount = dataSourceDeletions.get();
-        
+
         memIters = tabletMemory.getIterators();
         Pair<Long,Map<String,DataFileValue>> reservation = datafileManager.reserveFilesForScan();
         fileReservationId = reservation.getFirst();
         files = reservation.getSecond();
       }
-      
+
       Collection<InterruptibleIterator> mapfiles = fileManager.openFiles(files, options.isolated);
-      
+
       List<SortedKeyValueIterator<Key,Value>> iters = new ArrayList<SortedKeyValueIterator<Key,Value>>(mapfiles.size() + memIters.size());
-      
+
       iters.addAll(mapfiles);
       iters.addAll(memIters);
-      
+
       for (SortedKeyValueIterator<Key,Value> skvi : iters)
         ((InterruptibleIterator) skvi).setInterruptFlag(interruptFlag);
-      
+
       MultiIterator multiIter = new MultiIterator(iters, extent);
-      
+
       TabletIteratorEnvironment iterEnv = new TabletIteratorEnvironment(IteratorScope.scan, acuTableConf, fileManager, files);
-      
+
       statsIterator = new StatsIterator(multiIter, TabletServer.seekCount, scannedCount);
-      
+
       DeletingIterator delIter = new DeletingIterator(statsIterator, false);
-      
+
       ColumnFamilySkippingIterator cfsi = new ColumnFamilySkippingIterator(delIter);
-      
+
       ColumnQualifierFilter colFilter = new ColumnQualifierFilter(cfsi, options.columnSet);
-      
+
       VisibilityFilter visFilter = new VisibilityFilter(colFilter, options.authorizations, options.defaultLabels);
-      
+
       return iterEnv.getTopLevelIterator(IteratorUtil
           .loadIterators(IteratorScope.scan, visFilter, extent, acuTableConf, options.ssiList, options.ssio, iterEnv));
     }
-    
+
     private void close(boolean sawErrors) {
-      
+
       if (memIters != null) {
         tabletMemory.returnIterators(memIters);
         memIters = null;
         datafileManager.returnFilesForScan(fileReservationId);
         fileReservationId = -1;
       }
-      
+
       synchronized (Tablet.this) {
         activeScans.remove(this);
         if (activeScans.size() == 0)
           Tablet.this.notifyAll();
       }
-      
+
       if (fileManager != null) {
         fileManager.releaseOpenFiles(sawErrors);
         fileManager = null;
       }
-      
+
       if (statsIterator != null) {
         statsIterator.report();
       }
 
     }
-    
+
     public void interrupt() {
       interruptFlag.set(true);
     }
-    
+
     @Override
     public DataSource getDeepCopyDataSource(IteratorEnvironment env) {
       throw new UnsupportedOperationException();
@@ -2152,28 +2156,28 @@ public class Tablet {
     public void setInterruptFlag(AtomicBoolean flag) {
       throw new UnsupportedOperationException();
     }
-    
+
   }
-  
+
   private DataFileValue minorCompact(Configuration conf, FileSystem fs, InMemoryMap memTable, String tmpDatafile, String newDatafile, String mergeFile,
       boolean hasQueueTime, long queued, CommitSession commitSession, long flushId, MinorCompactionReason mincReason) {
     boolean failed = false;
     long start = System.currentTimeMillis();
     timer.incrementStatusMinor();
-    
+
     long count = 0;
-    
+
     try {
       Span span = Trace.start("write");
       count = memTable.getNumEntries();
-      
+
       DataFileValue dfv = null;
       if (mergeFile != null)
         dfv = datafileManager.getDatafileSizes().get(mergeFile);
-      
+
       MinorCompactor compactor = new MinorCompactor(conf, fs, memTable, mergeFile, dfv, tmpDatafile, acuTableConf, extent, mincReason);
       CompactionStats stats = compactor.call();
-      
+
       span.stop();
       span = Trace.start("bringOnline");
       datafileManager.bringMinorCompactionOnline(tmpDatafile, newDatafile, mergeFile, new DataFileValue(stats.getFileSize(), stats.getEntriesWritten()),
@@ -2193,7 +2197,7 @@ public class Tablet {
       } catch (Throwable t) {
         log.error("Failed to free tablet memory", t);
       }
-      
+
       if (!failed) {
         lastMinorCompactionFinishTime = System.currentTimeMillis();
       }
@@ -2207,16 +2211,16 @@ public class Tablet {
         timer.updateTime(Operation.MINOR, start, count, failed);
     }
   }
-  
+
   private class MinorCompactionTask implements Runnable {
-    
+
     private long queued;
     private CommitSession commitSession;
     private DataFileValue stats;
     private String mergeFile;
     private long flushId;
     private MinorCompactionReason mincReason;
-    
+
     MinorCompactionTask(String mergeFile, CommitSession commitSession, long flushId, MinorCompactionReason mincReason) {
       queued = System.currentTimeMillis();
       minorCompactionWaitingToStart = true;
@@ -2225,7 +2229,8 @@ public class Tablet {
       this.flushId = flushId;
       this.mincReason = mincReason;
     }
-    
+
+    @Override
     public void run() {
       minorCompactionWaitingToStart = false;
       minorCompactionInProgress = true;
@@ -2256,7 +2261,7 @@ public class Tablet {
         this.stats = minorCompact(conf, fs, tabletMemory.getMinCMemTable(), newMapfileLocation + "_tmp", newMapfileLocation, mergeFile, true, queued,
             commitSession, flushId, mincReason);
         span.stop();
-        
+
         if (needsSplit()) {
           tabletServer.executeSplit(Tablet.this);
         } else {
@@ -2274,36 +2279,36 @@ public class Tablet {
       }
     }
   }
-  
+
   private synchronized MinorCompactionTask prepareForMinC(long flushId, MinorCompactionReason mincReason) {
     CommitSession oldCommitSession = tabletMemory.prepareForMinC();
     otherLogs = currentLogs;
     currentLogs = new HashSet<DfsLogger>();
-    
+
     String mergeFile = datafileManager.reserveMergingMinorCompactionFile();
-    
+
     return new MinorCompactionTask(mergeFile, oldCommitSession, flushId, mincReason);
-    
+
   }
-  
+
   void flush(long tableFlushID) {
     boolean updateMetadata = false;
     boolean initiateMinor = false;
-    
+
     try {
-      
+
       synchronized (this) {
-        
+
         // only want one thing at a time to update flush ID to ensure that metadata table and tablet in memory state are consistent
         if (updatingFlushID)
           return;
-        
+
         if (lastFlushID >= tableFlushID)
           return;
-        
+
         if (closing || closed || tabletMemory.memoryReservedForMinC())
           return;
-        
+
         if (tabletMemory.getMemTable().getNumEntries() == 0) {
           lastFlushID = tableFlushID;
           updatingFlushID = true;
@@ -2311,7 +2316,7 @@ public class Tablet {
         } else
           initiateMinor = true;
       }
-      
+
       if (updateMetadata) {
         TCredentials creds = SecurityConstants.getSystemCredentials();
         // if multiple threads were allowed to update this outside of a sync block, then it would be
@@ -2319,7 +2324,7 @@ public class Tablet {
         MetadataTable.updateTabletFlushID(extent, tableFlushID, creds, tabletServer.getLock());
       } else if (initiateMinor)
         initiateMinorCompaction(tableFlushID, MinorCompactionReason.USER);
-      
+
     } finally {
       if (updateMetadata) {
         synchronized (this) {
@@ -2328,9 +2333,9 @@ public class Tablet {
         }
       }
     }
-    
+
   }
-  
+
   boolean initiateMinorCompaction(MinorCompactionReason mincReason) {
     if (isClosed()) {
       // don't bother trying to get flush id if closed... could be closed after this check but that is ok... just trying to cut down on uneeded log messages....
@@ -2347,7 +2352,7 @@ public class Tablet {
     }
     return initiateMinorCompaction(flushId, mincReason);
   }
-  
+
   boolean minorCompactNow(MinorCompactionReason mincReason) {
     long flushId;
     try {
@@ -2370,22 +2375,22 @@ public class Tablet {
     tabletResources.executeMinorCompaction(mct);
     return true;
   }
-  
+
   private MinorCompactionTask createMinorCompactionTask(long flushId, MinorCompactionReason mincReason) {
     MinorCompactionTask mct;
     long t1, t2;
-    
+
     StringBuilder logMessage = null;
-    
+
     try {
       synchronized (this) {
         t1 = System.currentTimeMillis();
-        
+
         if (closing || closed || majorCompactionWaitingToStart || tabletMemory.memoryReservedForMinC() || tabletMemory.getMemTable().getNumEntries() == 0
             || updatingFlushID) {
-          
+
           logMessage = new StringBuilder();
-          
+
           logMessage.append(extent.toString());
           logMessage.append(" closing " + closing);
           logMessage.append(" closed " + closed);
@@ -2395,7 +2400,7 @@ public class Tablet {
           if (tabletMemory != null && tabletMemory.getMemTable() != null)
             logMessage.append(" tabletMemory.getMemTable().getNumEntries() " + tabletMemory.getMemTable().getNumEntries());
           logMessage.append(" updatingFlushID " + updatingFlushID);
-          
+
           return null;
         }
         // We're still recovering log entries
@@ -2405,7 +2410,7 @@ public class Tablet {
           logMessage.append(" datafileManager " + datafileManager);
           return null;
         }
-        
+
         mct = prepareForMinC(flushId, mincReason);
         t2 = System.currentTimeMillis();
       }
@@ -2414,11 +2419,11 @@ public class Tablet {
       if (logMessage != null && log.isDebugEnabled())
         log.debug(logMessage);
     }
-    
+
     log.debug(String.format("MinC initiate lock %.2f secs", (t2 - t1) / 1000.0));
     return mct;
   }
-  
+
   long getFlushID() throws NoNodeException {
     try {
       String zTablePath = Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID() + Constants.ZTABLES + "/" + extent.getTableId()
@@ -2436,11 +2441,11 @@ public class Tablet {
       }
     }
   }
-  
+
   long getCompactionCancelID() {
     String zTablePath = Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID() + Constants.ZTABLES + "/" + extent.getTableId()
         + Constants.ZTABLE_COMPACT_CANCEL_ID;
-    
+
     try {
       return Long.parseLong(new String(ZooReaderWriter.getRetryingInstance().getData(zTablePath, null), UTF_8));
     } catch (KeeperException e) {
@@ -2454,32 +2459,32 @@ public class Tablet {
     try {
       String zTablePath = Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID() + Constants.ZTABLES + "/" + extent.getTableId()
           + Constants.ZTABLE_COMPACT_ID;
-      
+
       String[] tokens = new String(ZooReaderWriter.getRetryingInstance().getData(zTablePath, null), UTF_8).split(",");
       long compactID = Long.parseLong(tokens[0]);
-      
+
       CompactionIterators iters = new CompactionIterators();
 
       if (tokens.length > 1) {
         Hex hex = new Hex();
         ByteArrayInputStream bais = new ByteArrayInputStream(hex.decode(tokens[1].split("=")[1].getBytes(UTF_8)));
         DataInputStream dis = new DataInputStream(bais);
-        
+
         try {
           iters.readFields(dis);
         } catch (IOException e) {
           throw new RuntimeException(e);
         }
-        
+
         KeyExtent ke = new KeyExtent(extent.getTableId(), iters.getEndRow(), iters.getStartRow());
-        
+
         if (!ke.overlaps(extent)) {
           // only use iterators if compaction range overlaps
           iters = new CompactionIterators();
         }
       }
 
-      
+
       return new Pair<Long,List<IteratorSetting>>(compactID, iters.getIterators());
     } catch (InterruptedException e) {
       throw new RuntimeException(e);
@@ -2495,62 +2500,62 @@ public class Tablet {
       throw new RuntimeException(e);
     }
   }
-  
+
   public synchronized void waitForMinC() {
     tabletMemory.waitForMinC();
   }
-  
+
   static class TConstraintViolationException extends Exception {
     private static final long serialVersionUID = 1L;
     private Violations violations;
     private List<Mutation> violators;
     private List<Mutation> nonViolators;
     private CommitSession commitSession;
-    
+
     TConstraintViolationException(Violations violations, List<Mutation> violators, List<Mutation> nonViolators, CommitSession commitSession) {
       this.violations = violations;
       this.violators = violators;
       this.nonViolators = nonViolators;
       this.commitSession = commitSession;
     }
-    
+
     Violations getViolations() {
       return violations;
     }
-    
+
     List<Mutation> getViolators() {
       return violators;
     }
-    
+
     List<Mutation> getNonViolators() {
       return nonViolators;
     }
-    
+
     CommitSession getCommitSession() {
       return commitSession;
     }
   }
-  
+
   private synchronized CommitSession finishPreparingMutations(long time) {
     if (writesInProgress < 0) {
       throw new IllegalStateException("waitingForLogs < 0 " + writesInProgress);
     }
-    
+
     if (closed || tabletMemory == null) {
       // log.debug("tablet closed, can't commit");
       return null;
     }
-    
+
     writesInProgress++;
     CommitSession commitSession = tabletMemory.getCommitSession();
     commitSession.incrementCommitsInProgress();
     commitSession.updateMaxCommittedTime(time);
     return commitSession;
   }
-  
+
   public void checkConstraints() {
     ConstraintChecker cc = constraintChecker.get();
-    
+
     if (cc.classLoaderChanged()) {
       ConstraintChecker ncc = new ConstraintChecker(getTableConfiguration());
       constraintChecker.compareAndSet(cc, ncc);
@@ -2558,9 +2563,9 @@ public class Tablet {
   }
 
   public CommitSession prepareMutationsForCommit(TservConstraintEnv cenv, List<Mutation> mutations) throws TConstraintViolationException {
-    
+
     ConstraintChecker cc = constraintChecker.get();
-    
+
     List<Mutation> violators = null;
     Violations violations = new Violations();
     cenv.setExtent(extent);
@@ -2573,22 +2578,22 @@ public class Tablet {
         violators.add(mutation);
       }
     }
-    
+
     long time = tabletTime.setUpdateTimes(mutations);
-    
+
     if (!violations.isEmpty()) {
-      
+
       HashSet<Mutation> violatorsSet = new HashSet<Mutation>(violators);
       ArrayList<Mutation> nonViolators = new ArrayList<Mutation>();
-      
+
       for (Mutation mutation : mutations) {
         if (!violatorsSet.contains(mutation)) {
           nonViolators.add(mutation);
         }
       }
-      
+
       CommitSession commitSession = null;
-      
+
       if (nonViolators.size() > 0) {
         // if everything is a violation, then it is expected that
         // code calling this will not log or commit
@@ -2596,66 +2601,66 @@ public class Tablet {
         if (commitSession == null)
           return null;
       }
-      
+
       throw new TConstraintViolationException(violations, violators, nonViolators, commitSession);
     }
-    
+
     return finishPreparingMutations(time);
   }
-  
+
   public synchronized void abortCommit(CommitSession commitSession, List<Mutation> value) {
     if (writesInProgress <= 0) {
       throw new IllegalStateException("waitingForLogs <= 0 " + writesInProgress);
     }
-    
+
     if (closeComplete || tabletMemory == null) {
       throw new IllegalStateException("aborting commit when tablet is closed");
     }
-    
+
     commitSession.decrementCommitsInProgress();
     writesInProgress--;
     if (writesInProgress == 0)
       this.notifyAll();
   }
-  
+
   public void commit(CommitSession commitSession, List<Mutation> mutations) {
-    
+
     int totalCount = 0;
     long totalBytes = 0;
-    
+
     // write the mutation to the in memory table
     for (Mutation mutation : mutations) {
       totalCount += mutation.size();
       totalBytes += mutation.numBytes();
     }
-    
+
     tabletMemory.mutate(commitSession, mutations);
-    
+
     synchronized (this) {
       if (writesInProgress < 1) {
         throw new IllegalStateException("commiting mutations after logging, but not waiting for any log messages");
       }
-      
+
       if (closed && closeComplete) {
         throw new IllegalStateException("tablet closed with outstanding messages to the logger");
       }
-      
+
       tabletMemory.updateMemoryUsageStats();
-      
+
       // decrement here in case an exception is thrown below
       writesInProgress--;
       if (writesInProgress == 0)
         this.notifyAll();
-      
+
       commitSession.decrementCommitsInProgress();
-      
+
       numEntries += totalCount;
       numEntriesInMemory += totalCount;
       ingestCount += totalCount;
       ingestBytes += totalBytes;
     }
   }
-  
+
   /**
    * Closes the mapfiles associated with a Tablet. If saveState is true, a minor compaction is performed.
    */
@@ -2663,17 +2668,17 @@ public class Tablet {
     initiateClose(saveState, false, false);
     completeClose(saveState, true);
   }
-  
+
   void initiateClose(boolean saveState, boolean queueMinC, boolean disableWrites) {
-    
+
     if (!saveState && queueMinC) {
       throw new IllegalArgumentException("Not saving state on close and requesting minor compactions queue does not make sense");
     }
-    
+
     log.debug("initiateClose(saveState=" + saveState + " queueMinC=" + queueMinC + " disableWrites=" + disableWrites + ") " + getExtent());
-    
+
     MinorCompactionTask mct = null;
-    
+
     synchronized (this) {
       if (closed || closing || closeComplete) {
         String msg = "Tablet " + getExtent() + " already";
@@ -2685,15 +2690,15 @@ public class Tablet {
           msg += " closeComplete";
         throw new IllegalStateException(msg);
       }
-      
+
       // enter the closing state, no splits, minor, or major compactions can start
       // should cause running major compactions to stop
       closing = true;
       this.notifyAll();
-      
+
       // determines if inserts and queries can still continue while minor compacting
       closed = disableWrites;
-      
+
       // wait for major compactions to finish, setting closing to
       // true should cause any running major compactions to abort
       while (majorCompactionInProgress) {
@@ -2703,7 +2708,7 @@ public class Tablet {
           log.error(e.toString());
         }
       }
-      
+
       while (updatingFlushID) {
         try {
           this.wait(50);
@@ -2715,50 +2720,50 @@ public class Tablet {
       if (!saveState || tabletMemory.getMemTable().getNumEntries() == 0) {
         return;
       }
-      
+
       tabletMemory.waitForMinC();
-      
+
       try {
         mct = prepareForMinC(getFlushID(), MinorCompactionReason.CLOSE);
       } catch (NoNodeException e) {
         throw new RuntimeException(e);
       }
-      
+
       if (queueMinC) {
         tabletResources.executeMinorCompaction(mct);
         return;
       }
-      
+
     }
-    
+
     // do minor compaction outside of synch block so that tablet can be read and written to while
     // compaction runs
     mct.run();
   }
-  
+
   private boolean closeCompleting = false;
-  
+
   synchronized void completeClose(boolean saveState, boolean completeClose) throws IOException {
-    
+
     if (!closing || closeComplete || closeCompleting) {
       throw new IllegalStateException("closing = " + closing + " closed = " + closed + " closeComplete = " + closeComplete + " closeCompleting = "
           + closeCompleting);
     }
-    
+
     log.debug("completeClose(saveState=" + saveState + " completeClose=" + completeClose + ") " + getExtent());
-    
+
     // ensure this method is only called once, also guards against multiple
     // threads entering the method at the same time
     closeCompleting = true;
     closed = true;
-    
+
     // modify dataSourceDeletions so scans will try to switch data sources and fail because the tablet is closed
     dataSourceDeletions.incrementAndGet();
-    
+
     for (ScanDataSource activeScan : activeScans) {
       activeScan.interrupt();
     }
-    
+
     // wait for reads and writes to complete
     while (writesInProgress > 0 || activeScans.size() > 0) {
       try {
@@ -2767,9 +2772,9 @@ public class Tablet {
         log.error(e.toString());
       }
     }
-    
+
     tabletMemory.waitForMinC();
-    
+
     if (saveState && tabletMemory.getMemTable().getNumEntries() > 0) {
       try {
         prepareForMinC(getFlushID(), MinorCompactionReason.CLOSE).run();
@@ -2777,7 +2782,7 @@ public class Tablet {
         throw new RuntimeException(e);
       }
     }
-    
+
     if (saveState) {
       // at this point all tablet data is flushed, so do a consistency check
       RuntimeException err = null;
@@ -2796,48 +2801,48 @@ public class Tablet {
         log.error("Tablet closed consistency check has failed for " + this.extent + " giving up and closing");
       }
     }
-    
+
     try {
       tabletMemory.getMemTable().delete(0);
     } catch (Throwable t) {
       log.error("Failed to delete mem table : " + t.getMessage(), t);
     }
-    
+
     tabletMemory = null;
-    
+
     // close map files
     tabletResources.close();
-    
+
     log.log(TLevel.TABLET_HIST, extent + " closed");
-    
+
     acuTableConf.removeObserver(configObserver);
-    
+
     closeComplete = completeClose;
   }
-  
+
   private void closeConsistencyCheck() {
-    
+
     if (tabletMemory.getMemTable().getNumEntries() != 0) {
       String msg = "Closed tablet " + extent + " has " + tabletMemory.getMemTable().getNumEntries() + " entries in memory";
       log.error(msg);
       throw new RuntimeException(msg);
     }
-    
+
     if (tabletMemory.memoryReservedForMinC()) {
       String msg = "Closed tablet " + extent + " has minor compacting memory";
       log.error(msg);
       throw new RuntimeException(msg);
     }
-    
+
     try {
       Pair<List<LogEntry>,SortedMap<String,DataFileValue>> fileLog = MetadataTable.getFileAndLogEntries(SecurityConstants.getSystemCredentials(), extent);
-      
+
       if (fileLog.getFirst().size() != 0) {
         String msg = "Closed tablet " + extent + " has walog entries in !METADATA " + fileLog.getFirst();
         log.error(msg);
         throw new RuntimeException(msg);
       }
-      
+
       if (extent.isRootTablet()) {
         if (!fileLog.getSecond().keySet().equals(datafileManager.getDatafileSizesRel().keySet())) {
           String msg = "Data file in !METADATA differ from in memory data " + extent + "  " + fileLog.getSecond().keySet() + "  "
@@ -2853,59 +2858,60 @@ public class Tablet {
           throw new RuntimeException(msg);
         }
       }
-      
+
     } catch (Exception e) {
       String msg = "Failed to do close consistency check for tablet " + extent;
       log.error(msg, e);
       throw new RuntimeException(msg, e);
-      
+
     }
-    
+
     if (otherLogs.size() != 0 || currentLogs.size() != 0) {
       String msg = "Closed tablet " + extent + " has walog entries in memory currentLogs = " + currentLogs + "  otherLogs = " + otherLogs;
       log.error(msg);
       throw new RuntimeException(msg);
     }
-    
+
     // TODO check lastFlushID and lostCompactID - ACCUMULO-1290
   }
-  
+
   /**
    * Returns a Path object representing the tablet's location on the DFS.
-   * 
+   *
    * @return location
    */
   public Path getLocation() {
     return location;
   }
-  
+
   private class CompactionRunner implements Runnable, Comparable<CompactionRunner> {
-    
+
     long queued;
     long start;
     boolean failed = false;
     private MajorCompactionReason reason;
-    
+
     public CompactionRunner(MajorCompactionReason reason) {
       queued = System.currentTimeMillis();
       this.reason = reason;
     }
-    
+
+    @Override
     public void run() {
       CompactionStats majCStats = null;
-      
+
       if (tabletServer.isMajorCompactionDisabled()) {
         // this will make compaction task that were queued when shutdown was
         // initiated exit
         majorCompactionQueued.remove(reason);
         return;
       }
-      
+
       try {
         timer.incrementStatusMajor();
         start = System.currentTimeMillis();
         majCStats = majorCompact(reason);
-        
+
         // if there is more work to be done, queue another major compaction
         synchronized (Tablet.this) {
           if (reason == MajorCompactionReason.NORMAL && needsMajorCompaction(reason))
@@ -2919,12 +2925,12 @@ public class Tablet {
         if (majCStats != null) {
           count = majCStats.getEntriesRead();
         }
-        
+
         timer.updateTime(Operation.MAJOR, queued, start, count, failed);
       }
     }
-    
-    // We used to synchronize on the Tablet before fetching this information, 
+
+    // We used to synchronize on the Tablet before fetching this information,
     // but this method is called by the compaction queue thread to re-order the compactions.
     // The compaction queue holds a lock during this sort.
     // A tablet lock can be held while putting itself on the queue, so we can't lock the tablet
@@ -2933,40 +2939,40 @@ public class Tablet {
     private int getNumFiles() {
       return datafileManager.datafileSizes.size();
     }
-    
+
     @Override
     public int compareTo(CompactionRunner o) {
       int cmp = reason.compareTo(o.reason);
       if (cmp != 0)
         return cmp;
-      
+
       if (reason == MajorCompactionReason.USER || reason == MajorCompactionReason.CHOP) {
         // for these types of compactions want to do the oldest first
         cmp = (int) (queued - o.queued);
         if (cmp != 0)
           r

<TRUNCATED>

Mime
View raw message