accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctubb...@apache.org
Subject [45/66] [abbrv] accumulo git commit: ACCUMULO-3451 Format master branch (1.7.0-SNAPSHOT)
Date Fri, 09 Jan 2015 02:44:49 GMT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocatorImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocatorImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocatorImpl.java
index 1d49647..659d877 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocatorImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocatorImpl.java
@@ -52,23 +52,23 @@ import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 
 public class TabletLocatorImpl extends TabletLocator {
-  
+
   private static final Logger log = Logger.getLogger(TabletLocatorImpl.class);
-  
+
   // there seems to be a bug in TreeMap.tailMap related to
   // putting null in the treemap.. therefore instead of
   // putting null, put MAX_TEXT
   static final Text MAX_TEXT = new Text();
-  
+
   private static class EndRowComparator implements Comparator<Text>, Serializable {
-    
+
     private static final long serialVersionUID = 1L;
 
     @Override
     public int compare(Text o1, Text o2) {
-      
+
       int ret;
-      
+
       if (o1 == MAX_TEXT)
         if (o2 == MAX_TEXT)
           ret = 0;
@@ -78,21 +78,21 @@ public class TabletLocatorImpl extends TabletLocator {
         ret = -1;
       else
         ret = o1.compareTo(o2);
-      
+
       return ret;
     }
-    
+
   }
-  
+
   static final EndRowComparator endRowComparator = new EndRowComparator();
-  
+
   protected Text tableId;
   protected TabletLocator parent;
   protected TreeMap<Text,TabletLocation> metaCache = new TreeMap<Text,TabletLocation>(endRowComparator);
   protected TabletLocationObtainer locationObtainer;
   private TabletServerLockChecker lockChecker;
   protected Text lastTabletRow;
-  
+
   private TreeSet<KeyExtent> badExtents = new TreeSet<KeyExtent>();
   private ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
   private final Lock rLock = rwLock.readLock();
@@ -104,11 +104,11 @@ public class TabletLocatorImpl extends TabletLocator {
      */
     TabletLocations lookupTablet(ClientContext context, TabletLocation src, Text row, Text stopRow, TabletLocator parent) throws AccumuloSecurityException,
         AccumuloException;
-    
+
     List<TabletLocation> lookupTablets(ClientContext context, String tserver, Map<KeyExtent,List<Range>> map, TabletLocator parent)
         throws AccumuloSecurityException, AccumuloException;
   }
-  
+
   public static interface TabletServerLockChecker {
     boolean isLockHeld(String tserver, String session);
 
@@ -116,36 +116,36 @@ public class TabletLocatorImpl extends TabletLocator {
   }
 
   private class LockCheckerSession {
-    
+
     private HashSet<Pair<String,String>> okLocks = new HashSet<Pair<String,String>>();
     private HashSet<Pair<String,String>> invalidLocks = new HashSet<Pair<String,String>>();
-    
+
     private TabletLocation checkLock(TabletLocation tl) {
       // the goal of this class is to minimize calls out to lockChecker under that assumption that its a resource synchronized among many threads... want to
       // avoid fine grained synchronization when binning lots of mutations or ranges... remember decisions from the lockChecker in thread local unsynchronized
       // memory
-      
+
       if (tl == null)
         return null;
 
       Pair<String,String> lock = new Pair<String,String>(tl.tablet_location, tl.tablet_session);
-      
+
       if (okLocks.contains(lock))
         return tl;
-      
+
       if (invalidLocks.contains(lock))
         return null;
-      
+
       if (lockChecker.isLockHeld(tl.tablet_location, tl.tablet_session)) {
         okLocks.add(lock);
         return tl;
       }
-      
+
       if (log.isTraceEnabled())
         log.trace("Tablet server " + tl.tablet_location + " " + tl.tablet_session + " no longer holds its lock");
-      
+
       invalidLocks.add(lock);
-      
+
       return null;
     }
   }
@@ -155,34 +155,34 @@ public class TabletLocatorImpl extends TabletLocator {
     this.parent = parent;
     this.locationObtainer = tlo;
     this.lockChecker = tslc;
-    
+
     this.lastTabletRow = new Text(tableId);
     lastTabletRow.append(new byte[] {'<'}, 0, 1);
   }
-  
+
   @Override
   public <T extends Mutation> void binMutations(ClientContext context, List<T> mutations, Map<String,TabletServerMutations<T>> binnedMutations, List<T> failures)
       throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
-    
+
     OpTimer opTimer = null;
     if (log.isTraceEnabled())
       opTimer = new OpTimer(log, Level.TRACE).start("Binning " + mutations.size() + " mutations for table " + tableId);
-    
+
     ArrayList<T> notInCache = new ArrayList<T>();
     Text row = new Text();
-    
+
     LockCheckerSession lcSession = new LockCheckerSession();
 
     rLock.lock();
     try {
       processInvalidated(context, lcSession);
-      
+
       // for this to be efficient rows need to be in sorted order, but always sorting is slow... therefore only sort the
       // stuff not in the cache.... it is most efficient to pass _locateTablet rows in sorted order
-      
+
       // For this to be efficient, need to avoid fine grained synchronization and fine grained logging.
       // Therefore methods called by this are not synchronized and should not log.
-      
+
       for (T mutation : mutations) {
         row.set(mutation.getRow());
         TabletLocation tl = locateTabletInCache(row);
@@ -192,7 +192,7 @@ public class TabletLocatorImpl extends TabletLocator {
     } finally {
       rLock.unlock();
     }
-    
+
     if (notInCache.size() > 0) {
       Collections.sort(notInCache, new Comparator<Mutation>() {
         @Override
@@ -200,7 +200,7 @@ public class TabletLocatorImpl extends TabletLocator {
           return WritableComparator.compareBytes(o1.getRow(), 0, o1.getRow().length, o2.getRow(), 0, o2.getRow().length);
         }
       });
-      
+
       wLock.lock();
       try {
         boolean failed = false;
@@ -211,11 +211,11 @@ public class TabletLocatorImpl extends TabletLocator {
             failures.add(mutation);
             continue;
           }
-          
+
           row.set(mutation.getRow());
-          
+
           TabletLocation tl = _locateTablet(context, row, false, false, false, lcSession);
-          
+
           if (tl == null || !addMutation(binnedMutations, mutation, tl, lcSession)) {
             failures.add(mutation);
             failed = true;
@@ -233,7 +233,7 @@ public class TabletLocatorImpl extends TabletLocator {
   private <T extends Mutation> boolean addMutation(Map<String,TabletServerMutations<T>> binnedMutations, T mutation, TabletLocation tl,
       LockCheckerSession lcSession) {
     TabletServerMutations<T> tsm = binnedMutations.get(tl.tablet_location);
-    
+
     if (tsm == null) {
       // do lock check once per tserver here to make binning faster
       boolean lockHeld = lcSession.checkLock(tl) != null;
@@ -244,50 +244,50 @@ public class TabletLocatorImpl extends TabletLocator {
         return false;
       }
     }
-    
+
     // its possible the same tserver could be listed with different sessions
     if (tsm.getSession().equals(tl.tablet_session)) {
       tsm.addMutation(tl.tablet_extent, mutation);
       return true;
     }
-    
+
     return false;
   }
-  
+
   private List<Range> binRanges(ClientContext context, List<Range> ranges, Map<String,Map<KeyExtent,List<Range>>> binnedRanges, boolean useCache,
       LockCheckerSession lcSession) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
     List<Range> failures = new ArrayList<Range>();
     List<TabletLocation> tabletLocations = new ArrayList<TabletLocation>();
-    
+
     boolean lookupFailed = false;
-    
+
     l1: for (Range range : ranges) {
-      
+
       tabletLocations.clear();
-      
+
       Text startRow;
-      
+
       if (range.getStartKey() != null) {
         startRow = range.getStartKey().getRow();
       } else
         startRow = new Text();
-      
+
       TabletLocation tl = null;
-      
+
       if (useCache)
         tl = lcSession.checkLock(locateTabletInCache(startRow));
       else if (!lookupFailed)
         tl = _locateTablet(context, startRow, false, false, false, lcSession);
-      
+
       if (tl == null) {
         failures.add(range);
         if (!useCache)
           lookupFailed = true;
         continue;
       }
-      
+
       tabletLocations.add(tl);
-      
+
       while (tl.tablet_extent.getEndRow() != null && !range.afterEndKey(new Key(tl.tablet_extent.getEndRow()).followingKey(PartialKey.ROW))) {
         if (useCache) {
           Text row = new Text(tl.tablet_extent.getEndRow());
@@ -296,7 +296,7 @@ public class TabletLocatorImpl extends TabletLocator {
         } else {
           tl = _locateTablet(context, tl.tablet_extent.getEndRow(), true, false, false, lcSession);
         }
-        
+
         if (tl == null) {
           failures.add(range);
           if (!useCache)
@@ -309,46 +309,46 @@ public class TabletLocatorImpl extends TabletLocator {
       for (TabletLocation tl2 : tabletLocations) {
         TabletLocatorImpl.addRange(binnedRanges, tl2.tablet_location, tl2.tablet_extent, range);
       }
-      
+
     }
-    
+
     return failures;
   }
-  
+
   @Override
   public List<Range> binRanges(ClientContext context, List<Range> ranges, Map<String,Map<KeyExtent,List<Range>>> binnedRanges) throws AccumuloException,
       AccumuloSecurityException, TableNotFoundException {
-    
+
     /*
      * For this to be efficient, need to avoid fine grained synchronization and fine grained logging. Therefore methods called by this are not synchronized and
      * should not log.
      */
-    
+
     OpTimer opTimer = null;
     if (log.isTraceEnabled())
       opTimer = new OpTimer(log, Level.TRACE).start("Binning " + ranges.size() + " ranges for table " + tableId);
-    
+
     LockCheckerSession lcSession = new LockCheckerSession();
 
     List<Range> failures;
     rLock.lock();
     try {
       processInvalidated(context, lcSession);
-      
+
       // for this to be optimal, need to look ranges up in sorted order when
       // ranges are not present in cache... however do not want to always
       // sort ranges... therefore try binning ranges using only the cache
       // and sort whatever fails and retry
-      
+
       failures = binRanges(context, ranges, binnedRanges, true, lcSession);
     } finally {
       rLock.unlock();
     }
-    
+
     if (failures.size() > 0) {
       // sort failures by range start key
       Collections.sort(failures);
-      
+
       // try lookups again
       wLock.lock();
       try {
@@ -357,13 +357,13 @@ public class TabletLocatorImpl extends TabletLocator {
         wLock.unlock();
       }
     }
-    
+
     if (opTimer != null)
       opTimer.stop("Binned " + ranges.size() + " ranges for table " + tableId + " to " + binnedRanges.size() + " tservers in %DURATION%");
 
     return failures;
   }
-  
+
   @Override
   public void invalidateCache(KeyExtent failedExtent) {
     wLock.lock();
@@ -375,7 +375,7 @@ public class TabletLocatorImpl extends TabletLocator {
     if (log.isTraceEnabled())
       log.trace("Invalidated extent=" + failedExtent);
   }
-  
+
   @Override
   public void invalidateCache(Collection<KeyExtent> keySet) {
     wLock.lock();
@@ -387,11 +387,11 @@ public class TabletLocatorImpl extends TabletLocator {
     if (log.isTraceEnabled())
       log.trace("Invalidated " + keySet.size() + " cache entries for table " + tableId);
   }
-  
+
   @Override
   public void invalidateCache(Instance instance, String server) {
     int invalidatedCount = 0;
-    
+
     wLock.lock();
     try {
       for (TabletLocation cacheEntry : metaCache.values())
@@ -402,14 +402,14 @@ public class TabletLocatorImpl extends TabletLocator {
     } finally {
       wLock.unlock();
     }
-    
+
     lockChecker.invalidateCache(server);
 
     if (log.isTraceEnabled())
       log.trace("invalidated " + invalidatedCount + " cache entries  table=" + tableId + " server=" + server);
-    
+
   }
-  
+
   @Override
   public void invalidateCache() {
     int invalidatedCount;
@@ -423,18 +423,18 @@ public class TabletLocatorImpl extends TabletLocator {
     if (log.isTraceEnabled())
       log.trace("invalidated all " + invalidatedCount + " cache entries for table=" + tableId);
   }
-  
+
   @Override
   public TabletLocation locateTablet(ClientContext context, Text row, boolean skipRow, boolean retry) throws AccumuloException, AccumuloSecurityException,
       TableNotFoundException {
-    
+
     OpTimer opTimer = null;
     if (log.isTraceEnabled())
       opTimer = new OpTimer(log, Level.TRACE).start("Locating tablet  table=" + tableId + " row=" + TextUtil.truncate(row) + "  skipRow=" + skipRow + " retry="
           + retry);
-    
+
     while (true) {
-      
+
       LockCheckerSession lcSession = new LockCheckerSession();
       TabletLocation tl = _locateTablet(context, row, skipRow, retry, true, lcSession);
 
@@ -444,21 +444,21 @@ public class TabletLocatorImpl extends TabletLocator {
           log.trace("Failed to locate tablet containing row " + TextUtil.truncate(row) + " in table " + tableId + ", will retry...");
         continue;
       }
-      
+
       if (opTimer != null)
         opTimer.stop("Located tablet " + (tl == null ? null : tl.tablet_extent) + " at " + (tl == null ? null : tl.tablet_location) + " in %DURATION%");
-      
+
       return tl;
     }
   }
-  
+
   private void lookupTabletLocation(ClientContext context, Text row, boolean retry, LockCheckerSession lcSession) throws AccumuloException,
       AccumuloSecurityException, TableNotFoundException {
     Text metadataRow = new Text(tableId);
     metadataRow.append(new byte[] {';'}, 0, 1);
     metadataRow.append(row.getBytes(), 0, row.getLength());
     TabletLocation ptl = parent.locateTablet(context, metadataRow, false, retry);
-    
+
     if (ptl != null) {
       TabletLocations locations = locationObtainer.lookupTablet(context, ptl, metadataRow, lastTabletRow, parent);
       while (locations != null && locations.getLocations().isEmpty() && locations.getLocationless().isEmpty()) {
@@ -475,19 +475,19 @@ public class TabletLocatorImpl extends TabletLocator {
           break;
         }
       }
-      
+
       if (locations == null)
         return;
-      
+
       // cannot assume the list contains contiguous key extents... so it is probably
       // best to deal with each extent individually
-      
+
       Text lastEndRow = null;
       for (TabletLocation tabletLocation : locations.getLocations()) {
-        
+
         KeyExtent ke = tabletLocation.tablet_extent;
         TabletLocation locToCache;
-        
+
         // create new location if current prevEndRow == endRow
         if ((lastEndRow != null) && (ke.getPrevEndRow() != null) && ke.getPrevEndRow().equals(lastEndRow)) {
           locToCache = new TabletLocation(new KeyExtent(ke.getTableId(), ke.getEndRow(), lastEndRow), tabletLocation.tablet_location,
@@ -495,35 +495,35 @@ public class TabletLocatorImpl extends TabletLocator {
         } else {
           locToCache = tabletLocation;
         }
-        
+
         // save endRow for next iteration
         lastEndRow = locToCache.tablet_extent.getEndRow();
-        
+
         updateCache(locToCache, lcSession);
       }
     }
-    
+
   }
-  
+
   private void updateCache(TabletLocation tabletLocation, LockCheckerSession lcSession) {
     if (!tabletLocation.tablet_extent.getTableId().equals(tableId)) {
       // sanity check
       throw new IllegalStateException("Unexpected extent returned " + tableId + "  " + tabletLocation.tablet_extent);
     }
-    
+
     if (tabletLocation.tablet_location == null) {
       // sanity check
       throw new IllegalStateException("Cannot add null locations to cache " + tableId + "  " + tabletLocation.tablet_extent);
     }
-    
+
     if (!tabletLocation.tablet_extent.getTableId().equals(tableId)) {
       // sanity check
       throw new IllegalStateException("Cannot add other table ids to locations cache " + tableId + "  " + tabletLocation.tablet_extent);
     }
-    
+
     // clear out any overlapping extents in cache
     removeOverlapping(metaCache, tabletLocation.tablet_extent);
-    
+
     // do not add to cache unless lock is held
     if (lcSession.checkLock(tabletLocation) == null)
       return;
@@ -533,14 +533,14 @@ public class TabletLocatorImpl extends TabletLocator {
     if (er == null)
       er = MAX_TEXT;
     metaCache.put(er, tabletLocation);
-    
+
     if (badExtents.size() > 0)
       removeOverlapping(badExtents, tabletLocation.tablet_extent);
   }
-  
+
   static void removeOverlapping(TreeMap<Text,TabletLocation> metaCache, KeyExtent nke) {
     Iterator<Entry<Text,TabletLocation>> iter = null;
-    
+
     if (nke.getPrevEndRow() == null) {
       iter = metaCache.entrySet().iterator();
     } else {
@@ -548,30 +548,30 @@ public class TabletLocatorImpl extends TabletLocator {
       SortedMap<Text,TabletLocation> tailMap = metaCache.tailMap(row);
       iter = tailMap.entrySet().iterator();
     }
-    
+
     while (iter.hasNext()) {
       Entry<Text,TabletLocation> entry = iter.next();
-      
+
       KeyExtent ke = entry.getValue().tablet_extent;
-      
+
       if (stopRemoving(nke, ke)) {
         break;
       }
-      
+
       iter.remove();
     }
   }
-  
+
   private static boolean stopRemoving(KeyExtent nke, KeyExtent ke) {
     return ke.getPrevEndRow() != null && nke.getEndRow() != null && ke.getPrevEndRow().compareTo(nke.getEndRow()) >= 0;
   }
-  
+
   private static Text rowAfterPrevRow(KeyExtent nke) {
     Text row = new Text(nke.getPrevEndRow());
     row.append(new byte[] {0}, 0, 1);
     return row;
   }
-  
+
   static void removeOverlapping(TreeSet<KeyExtent> extents, KeyExtent nke) {
     for (KeyExtent overlapping : KeyExtent.findOverlapping(nke, extents)) {
       extents.remove(overlapping);
@@ -579,9 +579,9 @@ public class TabletLocatorImpl extends TabletLocator {
   }
 
   private TabletLocation locateTabletInCache(Text row) {
-    
+
     Entry<Text,TabletLocation> entry = metaCache.ceilingEntry(row);
-    
+
     if (entry != null) {
       KeyExtent ke = entry.getValue().tablet_extent;
       if (ke.getPrevEndRow() == null || ke.getPrevEndRow().compareTo(row) < 0) {
@@ -590,17 +590,17 @@ public class TabletLocatorImpl extends TabletLocator {
     }
     return null;
   }
-  
+
   protected TabletLocation _locateTablet(ClientContext context, Text row, boolean skipRow, boolean retry, boolean lock, LockCheckerSession lcSession)
       throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
-    
+
     if (skipRow) {
       row = new Text(row);
       row.append(new byte[] {0}, 0, 1);
     }
-    
+
     TabletLocation tl;
-    
+
     if (lock)
       rLock.lock();
     try {
@@ -610,30 +610,30 @@ public class TabletLocatorImpl extends TabletLocator {
       if (lock)
         rLock.unlock();
     }
-    
+
     if (tl == null) {
       if (lock)
         wLock.lock();
       try {
         // not in cache, so obtain info
         lookupTabletLocation(context, row, retry, lcSession);
-        
+
         tl = lcSession.checkLock(locateTabletInCache(row));
       } finally {
         if (lock)
           wLock.unlock();
       }
     }
-    
+
     return tl;
   }
-  
+
   private void processInvalidated(ClientContext context, LockCheckerSession lcSession) throws AccumuloSecurityException, AccumuloException,
       TableNotFoundException {
-    
+
     if (badExtents.size() == 0)
       return;
-    
+
     final boolean writeLockHeld = rwLock.isWriteLockedByCurrentThread();
     try {
       if (!writeLockHeld) {
@@ -642,27 +642,27 @@ public class TabletLocatorImpl extends TabletLocator {
         if (badExtents.size() == 0)
           return;
       }
-      
+
       List<Range> lookups = new ArrayList<Range>(badExtents.size());
-      
+
       for (KeyExtent be : badExtents) {
         lookups.add(be.toMetadataRange());
         removeOverlapping(metaCache, be);
       }
-      
+
       lookups = Range.mergeOverlapping(lookups);
-      
+
       Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
-      
+
       parent.binRanges(context, lookups, binnedRanges);
-      
+
       // randomize server order
       ArrayList<String> tabletServers = new ArrayList<String>(binnedRanges.keySet());
       Collections.shuffle(tabletServers);
-      
+
       for (String tserver : tabletServers) {
         List<TabletLocation> locations = locationObtainer.lookupTablets(context, tserver, binnedRanges.get(tserver), parent);
-        
+
         for (TabletLocation tabletLocation : locations) {
           updateCache(tabletLocation, lcSession);
         }
@@ -674,21 +674,21 @@ public class TabletLocatorImpl extends TabletLocator {
       }
     }
   }
-  
+
   protected static void addRange(Map<String,Map<KeyExtent,List<Range>>> binnedRanges, String location, KeyExtent ke, Range range) {
     Map<KeyExtent,List<Range>> tablets = binnedRanges.get(location);
     if (tablets == null) {
       tablets = new HashMap<KeyExtent,List<Range>>();
       binnedRanges.put(location, tablets);
     }
-    
+
     List<Range> tabletsRanges = tablets.get(ke);
     if (tabletsRanges == null) {
       tabletsRanges = new ArrayList<Range>();
       tablets.put(ke, tabletsRanges);
     }
-    
+
     tabletsRanges.add(range);
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchDeleter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchDeleter.java b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchDeleter.java
index b88571a..d3b26dc 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchDeleter.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchDeleter.java
@@ -33,11 +33,11 @@ import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.ColumnVisibility;
 
 public class TabletServerBatchDeleter extends TabletServerBatchReader implements BatchDeleter {
-  
+
   private final ClientContext context;
   private String tableId;
   private BatchWriterConfig bwConfig;
-  
+
   public TabletServerBatchDeleter(ClientContext context, String tableId, Authorizations authorizations, int numQueryThreads, BatchWriterConfig bwConfig)
       throws TableNotFoundException {
     super(context, tableId, authorizations, numQueryThreads);
@@ -46,7 +46,7 @@ public class TabletServerBatchDeleter extends TabletServerBatchReader implements
     this.bwConfig = bwConfig;
     super.addScanIterator(new IteratorSetting(Integer.MAX_VALUE, BatchDeleter.class.getName() + ".NOVALUE", SortedKeyIterator.class));
   }
-  
+
   @Override
   public void delete() throws MutationsRejectedException, TableNotFoundException {
     BatchWriter bw = null;
@@ -65,5 +65,5 @@ public class TabletServerBatchDeleter extends TabletServerBatchReader implements
         bw.close();
     }
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReader.java b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReader.java
index 2a79f05..a7422c2 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReader.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReader.java
@@ -34,25 +34,25 @@ import org.apache.log4j.Logger;
 
 public class TabletServerBatchReader extends ScannerOptions implements BatchScanner {
   private static final Logger log = Logger.getLogger(TabletServerBatchReader.class);
-  
+
   private String table;
   private int numThreads;
   private ExecutorService queryThreadPool;
-  
+
   private final ClientContext context;
   private ArrayList<Range> ranges;
-  
+
   private Authorizations authorizations = Authorizations.EMPTY;
   private Throwable ex = null;
-  
+
   private static int nextBatchReaderInstance = 1;
-  
+
   private static synchronized int getNextBatchReaderInstance() {
     return nextBatchReaderInstance++;
   }
-  
+
   private final int batchReaderInstance = getNextBatchReaderInstance();
-  
+
   public TabletServerBatchReader(ClientContext context, String table, Authorizations authorizations, int numQueryThreads) {
     checkArgument(context != null, "context is null");
     checkArgument(table != null, "table is null");
@@ -61,18 +61,18 @@ public class TabletServerBatchReader extends ScannerOptions implements BatchScan
     this.authorizations = authorizations;
     this.table = table;
     this.numThreads = numQueryThreads;
-    
+
     queryThreadPool = new SimpleThreadPool(numQueryThreads, "batch scanner " + batchReaderInstance + "-");
-    
+
     ranges = null;
     ex = new Throwable();
   }
-  
+
   @Override
   public void close() {
     queryThreadPool.shutdownNow();
   }
-  
+
   /**
    * Warning: do not rely upon finalize to close this class. Finalize is not guaranteed to be called.
    */
@@ -83,31 +83,31 @@ public class TabletServerBatchReader extends ScannerOptions implements BatchScan
       close();
     }
   }
-  
+
   @Override
   public void setRanges(Collection<Range> ranges) {
     if (ranges == null || ranges.size() == 0) {
       throw new IllegalArgumentException("ranges must be non null and contain at least 1 range");
     }
-    
+
     if (queryThreadPool.isShutdown()) {
       throw new IllegalStateException("batch reader closed");
     }
-    
+
     this.ranges = new ArrayList<Range>(ranges);
-    
+
   }
-  
+
   @Override
   public Iterator<Entry<Key,Value>> iterator() {
     if (ranges == null) {
       throw new IllegalStateException("ranges not set");
     }
-    
+
     if (queryThreadPool.isShutdown()) {
       throw new IllegalStateException("batch reader closed");
     }
-    
+
     return new TabletServerBatchReaderIterator(context, table, authorizations, ranges, numThreads, queryThreadPool, this, timeOut);
   }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
index eb80f8b..df330ed 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
@@ -73,9 +73,9 @@ import org.apache.thrift.transport.TTransportException;
 import com.google.common.net.HostAndPort;
 
 public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value>> {
-  
+
   private static final Logger log = Logger.getLogger(TabletServerBatchReaderIterator.class);
-  
+
   private final ClientContext context;
   private final Instance instance;
   private final String table;
@@ -83,30 +83,30 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value
   private final int numThreads;
   private final ExecutorService queryThreadPool;
   private final ScannerOptions options;
-  
+
   private ArrayBlockingQueue<List<Entry<Key,Value>>> resultsQueue;
   private Iterator<Entry<Key,Value>> batchIterator;
   private List<Entry<Key,Value>> batch;
   private static final List<Entry<Key,Value>> LAST_BATCH = new ArrayList<Map.Entry<Key,Value>>();
   private final Object nextLock = new Object();
-  
+
   private long failSleepTime = 100;
-  
+
   private volatile Throwable fatalException = null;
-  
+
   private Map<String,TimeoutTracker> timeoutTrackers;
   private Set<String> timedoutServers;
   private long timeout;
-  
+
   private TabletLocator locator;
-  
+
   public interface ResultReceiver {
     void receive(List<Entry<Key,Value>> entries);
   }
-  
+
   public TabletServerBatchReaderIterator(ClientContext context, String table, Authorizations authorizations, ArrayList<Range> ranges, int numThreads,
       ExecutorService queryThreadPool, ScannerOptions scannerOptions, long timeout) {
-    
+
     this.context = context;
     this.instance = context.getInstance();
     this.table = table;
@@ -115,24 +115,24 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value
     this.queryThreadPool = queryThreadPool;
     this.options = new ScannerOptions(scannerOptions);
     resultsQueue = new ArrayBlockingQueue<List<Entry<Key,Value>>>(numThreads);
-    
+
     this.locator = new TimeoutTabletLocator(TabletLocator.getLocator(context, new Text(table)), timeout);
-    
+
     timeoutTrackers = Collections.synchronizedMap(new HashMap<String,TabletServerBatchReaderIterator.TimeoutTracker>());
     timedoutServers = Collections.synchronizedSet(new HashSet<String>());
     this.timeout = timeout;
-    
+
     if (options.fetchedColumns.size() > 0) {
       ArrayList<Range> ranges2 = new ArrayList<Range>(ranges.size());
       for (Range range : ranges) {
         ranges2.add(range.bound(options.fetchedColumns.first(), options.fetchedColumns.last()));
       }
-      
+
       ranges = ranges2;
     }
-    
+
     ResultReceiver rr = new ResultReceiver() {
-      
+
       @Override
       public void receive(List<Entry<Key,Value>> entries) {
         try {
@@ -144,12 +144,12 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value
             log.warn("Failed to add Batch Scan result", e);
           fatalException = e;
           throw new RuntimeException(e);
-          
+
         }
       }
-      
+
     };
-    
+
     try {
       lookup(ranges, rr);
     } catch (RuntimeException re) {
@@ -158,31 +158,31 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value
       throw new RuntimeException("Failed to create iterator", e);
     }
   }
-  
+
   @Override
   public boolean hasNext() {
     synchronized (nextLock) {
       if (batch == LAST_BATCH)
         return false;
-      
+
       if (batch != null && batchIterator.hasNext())
         return true;
-      
+
       // don't have one cached, try to cache one and return success
       try {
         batch = null;
         while (batch == null && fatalException == null && !queryThreadPool.isShutdown())
           batch = resultsQueue.poll(1, TimeUnit.SECONDS);
-        
+
         if (fatalException != null)
           if (fatalException instanceof RuntimeException)
             throw (RuntimeException) fatalException;
           else
             throw new RuntimeException(fatalException);
-        
+
         if (queryThreadPool.isShutdown())
           throw new RuntimeException("scanner closed");
-        
+
         batchIterator = batch.iterator();
         return batch != LAST_BATCH;
       } catch (InterruptedException e) {
@@ -190,7 +190,7 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value
       }
     }
   }
-  
+
   @Override
   public Entry<Key,Value> next() {
     // if there's one waiting, or hasNext() can get one, return it
@@ -201,33 +201,33 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value
         throw new NoSuchElementException();
     }
   }
-  
+
   @Override
   public void remove() {
     throw new UnsupportedOperationException();
   }
-  
+
   private synchronized void lookup(List<Range> ranges, ResultReceiver receiver) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
     List<Column> columns = new ArrayList<Column>(options.fetchedColumns);
     ranges = Range.mergeOverlapping(ranges);
-    
+
     Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
-    
+
     binRanges(locator, ranges, binnedRanges);
-    
+
     doLookups(binnedRanges, receiver, columns);
   }
-  
+
   private void binRanges(TabletLocator tabletLocator, List<Range> ranges, Map<String,Map<KeyExtent,List<Range>>> binnedRanges) throws AccumuloException,
       AccumuloSecurityException, TableNotFoundException {
-    
+
     int lastFailureSize = Integer.MAX_VALUE;
-    
+
     while (true) {
-      
+
       binnedRanges.clear();
       List<Range> failures = tabletLocator.binRanges(context, ranges, binnedRanges);
-      
+
       if (failures.size() > 0) {
         // tried to only do table state checks when failures.size() == ranges.size(), however this did
         // not work because nothing ever invalidated entries in the tabletLocator cache... so even though
@@ -238,9 +238,9 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value
             throw new TableDeletedException(table);
           else if (Tables.getTableState(instance, table) == TableState.OFFLINE)
             throw new TableOfflineException(instance, table);
-        
+
         lastFailureSize = failures.size();
-        
+
         if (log.isTraceEnabled())
           log.trace("Failed to bin " + failures.size() + " ranges, tablet locations were null, retrying in 100ms");
         try {
@@ -251,9 +251,9 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value
       } else {
         break;
       }
-      
+
     }
-    
+
     // truncate the ranges to within the tablets... this makes it easier to know what work
     // needs to be redone when failures occurs and tablets have merged or split
     Map<String,Map<KeyExtent,List<Range>>> binnedRanges2 = new HashMap<String,Map<KeyExtent,List<Range>>>();
@@ -268,47 +268,47 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value
           clippedRanges.add(tabletRange.clip(range));
       }
     }
-    
+
     binnedRanges.clear();
     binnedRanges.putAll(binnedRanges2);
   }
-  
+
   private void processFailures(Map<KeyExtent,List<Range>> failures, ResultReceiver receiver, List<Column> columns) throws AccumuloException,
       AccumuloSecurityException, TableNotFoundException {
     if (log.isTraceEnabled())
       log.trace("Failed to execute multiscans against " + failures.size() + " tablets, retrying...");
-    
+
     try {
       Thread.sleep(failSleepTime);
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
-      
+
       // We were interrupted (close called on batchscanner) just exit
       log.debug("Exiting failure processing on interrupt");
       return;
     }
 
     failSleepTime = Math.min(5000, failSleepTime * 2);
-    
+
     Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
     List<Range> allRanges = new ArrayList<Range>();
-    
+
     for (List<Range> ranges : failures.values())
       allRanges.addAll(ranges);
-    
+
     // since the first call to binRanges clipped the ranges to within a tablet, we should not get only
     // bin to the set of failed tablets
     binRanges(locator, allRanges, binnedRanges);
-    
+
     doLookups(binnedRanges, receiver, columns);
   }
-  
+
   private String getTableInfo() {
     return Tables.getPrintableTableInfoFromId(instance, table);
   }
-  
+
   private class QueryTask implements Runnable {
-    
+
     private String tsLocation;
     private Map<KeyExtent,List<Range>> tabletsRanges;
     private ResultReceiver receiver;
@@ -316,7 +316,7 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value
     private final Map<KeyExtent,List<Range>> failures;
     private List<Column> columns;
     private int semaphoreSize;
-    
+
     QueryTask(String tsLocation, Map<KeyExtent,List<Range>> tabletsRanges, Map<KeyExtent,List<Range>> failures, ResultReceiver receiver, List<Column> columns) {
       this.tsLocation = tsLocation;
       this.tabletsRanges = tabletsRanges;
@@ -324,12 +324,12 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value
       this.columns = columns;
       this.failures = failures;
     }
-    
+
     void setSemaphore(Semaphore semaphore, int semaphoreSize) {
       this.semaphore = semaphore;
       this.semaphoreSize = semaphoreSize;
     }
-    
+
     @Override
     public void run() {
       String threadName = Thread.currentThread().getName();
@@ -349,19 +349,19 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value
             failures.putAll(tsFailures);
           }
         }
-        
+
       } catch (IOException e) {
         synchronized (failures) {
           failures.putAll(tsFailures);
           failures.putAll(unscanned);
         }
-        
+
         locator.invalidateCache(context.getInstance(), tsLocation);
         log.debug(e.getMessage(), e);
       } catch (AccumuloSecurityException e) {
         e.setTableInfo(getTableInfo());
         log.debug(e.getMessage(), e);
-        
+
         Tables.clearCache(instance);
         if (!Tables.exists(instance, table))
           fatalException = new TableDeletedException(table);
@@ -396,7 +396,7 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value
               log.debug(t.getMessage(), t);
               fatalException = t;
             }
-            
+
             if (fatalException != null) {
               // we are finished with this batch query
               if (!resultsQueue.offer(LAST_BATCH)) {
@@ -423,16 +423,16 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value
         }
       }
     }
-    
+
   }
-  
+
   private void doLookups(Map<String,Map<KeyExtent,List<Range>>> binnedRanges, final ResultReceiver receiver, List<Column> columns) {
-    
+
     if (timedoutServers.containsAll(binnedRanges.keySet())) {
       // all servers have timed out
       throw new TimedOutException(timedoutServers);
     }
-    
+
     // when there are lots of threads and a few tablet servers
     // it is good to break request to tablet servers up, the
     // following code determines if this is the case
@@ -442,16 +442,16 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value
       for (Entry<String,Map<KeyExtent,List<Range>>> entry : binnedRanges.entrySet()) {
         totalNumberOfTablets += entry.getValue().size();
       }
-      
+
       maxTabletsPerRequest = totalNumberOfTablets / numThreads;
       if (maxTabletsPerRequest == 0) {
         maxTabletsPerRequest = 1;
       }
-      
+
     }
-    
+
     Map<KeyExtent,List<Range>> failures = new HashMap<KeyExtent,List<Range>>();
-    
+
     if (timedoutServers.size() > 0) {
       // go ahead and fail any timed out servers
       for (Iterator<Entry<String,Map<KeyExtent,List<Range>>>> iterator = binnedRanges.entrySet().iterator(); iterator.hasNext();) {
@@ -462,16 +462,16 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value
         }
       }
     }
-    
+
     // randomize tabletserver order... this will help when there are multiple
     // batch readers and writers running against accumulo
     List<String> locations = new ArrayList<String>(binnedRanges.keySet());
     Collections.shuffle(locations);
-    
+
     List<QueryTask> queryTasks = new ArrayList<QueryTask>();
-    
+
     for (final String tsLocation : locations) {
-      
+
       final Map<KeyExtent,List<Range>> tabletsRanges = binnedRanges.get(tsLocation);
       if (maxTabletsPerRequest == Integer.MAX_VALUE || tabletsRanges.size() == 1) {
         QueryTask queryTask = new QueryTask(tsLocation, tabletsRanges, failures, receiver, columns);
@@ -486,44 +486,44 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value
             tabletSubset = new HashMap<KeyExtent,List<Range>>();
           }
         }
-        
+
         if (tabletSubset.size() > 0) {
           QueryTask queryTask = new QueryTask(tsLocation, tabletSubset, failures, receiver, columns);
           queryTasks.add(queryTask);
         }
       }
     }
-    
+
     final Semaphore semaphore = new Semaphore(queryTasks.size());
     semaphore.acquireUninterruptibly(queryTasks.size());
-    
+
     for (QueryTask queryTask : queryTasks) {
       queryTask.setSemaphore(semaphore, queryTasks.size());
       queryThreadPool.execute(new TraceRunnable(queryTask));
     }
   }
-  
+
   static void trackScanning(Map<KeyExtent,List<Range>> failures, Map<KeyExtent,List<Range>> unscanned, MultiScanResult scanResult) {
-    
+
     // translate returned failures, remove them from unscanned, and add them to failures
     Map<KeyExtent,List<Range>> retFailures = Translator.translate(scanResult.failures, Translators.TKET, new Translator.ListTranslator<TRange,Range>(
         Translators.TRT));
     unscanned.keySet().removeAll(retFailures.keySet());
     failures.putAll(retFailures);
-    
+
     // translate full scans and remove them from unscanned
     HashSet<KeyExtent> fullScans = new HashSet<KeyExtent>(Translator.translate(scanResult.fullScans, Translators.TKET));
     unscanned.keySet().removeAll(fullScans);
-    
+
     // remove partial scan from unscanned
     if (scanResult.partScan != null) {
       KeyExtent ke = new KeyExtent(scanResult.partScan);
       Key nextKey = new Key(scanResult.partNextKey);
-      
+
       ListIterator<Range> iterator = unscanned.get(ke).listIterator();
       while (iterator.hasNext()) {
         Range range = iterator.next();
-        
+
         if (range.afterEndKey(nextKey) || (nextKey.equals(range.getEndKey()) && scanResult.partNextKeyInclusive != range.isEndKeyInclusive())) {
           iterator.remove();
         } else if (range.contains(nextKey)) {
@@ -534,41 +534,41 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value
       }
     }
   }
-  
+
   private static class TimeoutTracker {
-    
+
     String server;
     Set<String> badServers;
     long timeOut;
     long activityTime;
     Long firstErrorTime = null;
-    
+
     TimeoutTracker(String server, Set<String> badServers, long timeOut) {
       this(timeOut);
       this.server = server;
       this.badServers = badServers;
     }
-    
+
     TimeoutTracker(long timeOut) {
       this.timeOut = timeOut;
     }
-    
+
     void startingScan() {
       activityTime = System.currentTimeMillis();
     }
-    
+
     void check() throws IOException {
       if (System.currentTimeMillis() - activityTime > timeOut) {
         badServers.add(server);
         throw new IOException("Time exceeded " + (System.currentTimeMillis() - activityTime) + " " + server);
       }
     }
-    
+
     void madeProgress() {
       activityTime = System.currentTimeMillis();
       firstErrorTime = null;
     }
-    
+
     void errorOccured(Exception e) {
       if (firstErrorTime == null) {
         firstErrorTime = activityTime;
@@ -576,28 +576,28 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value
         badServers.add(server);
       }
     }
-    
+
     /**
      */
     public long getTimeOut() {
       return timeOut;
     }
   }
-  
+
   public static void doLookup(ClientContext context, String server, Map<KeyExtent,List<Range>> requested, Map<KeyExtent,List<Range>> failures,
       Map<KeyExtent,List<Range>> unscanned, ResultReceiver receiver, List<Column> columns, ScannerOptions options, Authorizations authorizations)
       throws IOException, AccumuloSecurityException, AccumuloServerException {
     doLookup(context, server, requested, failures, unscanned, receiver, columns, options, authorizations, new TimeoutTracker(Long.MAX_VALUE));
   }
-  
+
   static void doLookup(ClientContext context, String server, Map<KeyExtent,List<Range>> requested, Map<KeyExtent,List<Range>> failures,
       Map<KeyExtent,List<Range>> unscanned, ResultReceiver receiver, List<Column> columns, ScannerOptions options, Authorizations authorizations,
       TimeoutTracker timeoutTracker) throws IOException, AccumuloSecurityException, AccumuloServerException {
-    
+
     if (requested.size() == 0) {
       return;
     }
-    
+
     // copy requested to unscanned map. we will remove ranges as they are scanned in trackScanning()
     for (Entry<KeyExtent,List<Range>> entry : requested.entrySet()) {
       ArrayList<Range> ranges = new ArrayList<Range>();
@@ -606,7 +606,7 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value
       }
       unscanned.put(new KeyExtent(entry.getKey()), ranges);
     }
-    
+
     timeoutTracker.startingScan();
     TTransport transport = null;
     try {
@@ -618,13 +618,13 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value
         client = ThriftUtil.getTServerClient(parsedServer, context);
 
       try {
-        
+
         OpTimer opTimer = new OpTimer(log, Level.TRACE).start("Starting multi scan, tserver=" + server + "  #tablets=" + requested.size() + "  #ranges="
             + sumSizes(requested.values()) + " ssil=" + options.serverSideIteratorList + " ssio=" + options.serverSideIteratorOptions);
-        
+
         TabletType ttype = TabletType.type(requested.keySet());
         boolean waitForWrites = !ThriftScanner.serversWaitedForWrites.get(ttype).contains(server);
-        
+
         Map<TKeyExtent,List<TRange>> thriftTabletRanges = Translator.translate(requested, Translators.KET, new Translator.ListTranslator<Range,TRange>(
             Translators.RT));
         InitialMultiScan imsr = client.startMultiScan(Tracer.traceInfo(), context.rpcCreds(), thriftTabletRanges,
@@ -634,48 +634,48 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value
           ThriftScanner.serversWaitedForWrites.get(ttype).add(server.toString());
 
         MultiScanResult scanResult = imsr.result;
-        
+
         opTimer.stop("Got 1st multi scan results, #results=" + scanResult.results.size() + (scanResult.more ? "  scanID=" + imsr.scanID : "")
             + " in %DURATION%");
-        
+
         ArrayList<Entry<Key,Value>> entries = new ArrayList<Map.Entry<Key,Value>>(scanResult.results.size());
         for (TKeyValue kv : scanResult.results) {
           entries.add(new SimpleImmutableEntry<Key,Value>(new Key(kv.key), new Value(kv.value)));
         }
-        
+
         if (entries.size() > 0)
           receiver.receive(entries);
-        
+
         if (entries.size() > 0 || scanResult.fullScans.size() > 0)
           timeoutTracker.madeProgress();
-        
+
         trackScanning(failures, unscanned, scanResult);
-        
+
         while (scanResult.more) {
-          
+
           timeoutTracker.check();
-          
+
           opTimer.start("Continuing multi scan, scanid=" + imsr.scanID);
           scanResult = client.continueMultiScan(Tracer.traceInfo(), imsr.scanID);
           opTimer.stop("Got more multi scan results, #results=" + scanResult.results.size() + (scanResult.more ? "  scanID=" + imsr.scanID : "")
               + " in %DURATION%");
-          
+
           entries = new ArrayList<Map.Entry<Key,Value>>(scanResult.results.size());
           for (TKeyValue kv : scanResult.results) {
             entries.add(new SimpleImmutableEntry<Key,Value>(new Key(kv.key), new Value(kv.value)));
           }
-          
+
           if (entries.size() > 0)
             receiver.receive(entries);
-          
+
           if (entries.size() > 0 || scanResult.fullScans.size() > 0)
             timeoutTracker.madeProgress();
-          
+
           trackScanning(failures, unscanned, scanResult);
         }
-        
+
         client.closeMultiScan(Tracer.traceInfo(), imsr.scanID);
-        
+
       } finally {
         ThriftUtil.returnClient(client);
       }
@@ -700,14 +700,14 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value
       ThriftTransportPool.getInstance().returnTransport(transport);
     }
   }
-  
+
   static int sumSizes(Collection<List<Range>> values) {
     int sum = 0;
-    
+
     for (List<Range> list : values) {
       sum += list.size();
     }
-    
+
     return sum;
   }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
index c54c2f1..7abc826 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
@@ -76,45 +76,45 @@ import com.google.common.net.HostAndPort;
 /*
  * Differences from previous TabletServerBatchWriter
  *   + As background threads finish sending mutations to tablet servers they decrement memory usage
- *   + Once the queue of unprocessed mutations reaches 50% it is always pushed to the background threads, 
- *      even if they are currently processing... new mutations are merged with mutations currently 
+ *   + Once the queue of unprocessed mutations reaches 50% it is always pushed to the background threads,
+ *      even if they are currently processing... new mutations are merged with mutations currently
  *      processing in the background
  *   + Failed mutations are held for 1000ms and then re-added to the unprocessed queue
  *   + Flush holds adding of new mutations so it does not wait indefinitely
- * 
+ *
  * Considerations
  *   + All background threads must catch and note Throwable
- *   + mutations for a single tablet server are only processed by one thread concurrently (if new mutations 
- *      come in for a tablet server while one thread is processing mutations for it, no other thread should 
+ *   + mutations for a single tablet server are only processed by one thread concurrently (if new mutations
+ *      come in for a tablet server while one thread is processing mutations for it, no other thread should
  *      start processing those mutations)
- *   
+ *
  * Memory accounting
  *   + when a mutation enters the system memory is incremented
  *   + when a mutation successfully leaves the system memory is decremented
- * 
- * 
- * 
+ *
+ *
+ *
  */
 
 public class TabletServerBatchWriter {
-  
+
   private static final Logger log = Logger.getLogger(TabletServerBatchWriter.class);
-  
+
   // basic configuration
   private final ClientContext context;
   private final long maxMem;
   private final long maxLatency;
   private final long timeout;
   private final Durability durability;
-  
+
   // state
   private boolean flushing;
   private boolean closed;
   private MutationSet mutations;
-  
+
   // background writer
   private final MutationWriter writer;
-  
+
   // latency timers
   private final Timer jtimer = new Timer("BatchWriterLatencyTimer", true);
   private final Map<String,TimeoutTracker> timeoutTrackers = Collections.synchronizedMap(new HashMap<String,TabletServerBatchWriter.TimeoutTracker>());
@@ -122,7 +122,7 @@ public class TabletServerBatchWriter {
   // stats
   private long totalMemUsed = 0;
   private long lastProcessingStartTime;
-  
+
   private long totalAdded = 0;
   private final AtomicLong totalSent = new AtomicLong(0);
   private final AtomicLong totalBinned = new AtomicLong(0);
@@ -132,7 +132,7 @@ public class TabletServerBatchWriter {
   private long initialGCTimes;
   private long initialCompileTimes;
   private double initialSystemLoad;
-  
+
   private int tabletServersBatchSum = 0;
   private int tabletBatchSum = 0;
   private int numBatches = 0;
@@ -140,7 +140,7 @@ public class TabletServerBatchWriter {
   private int minTabletBatch = Integer.MAX_VALUE;
   private int minTabletServersBatch = Integer.MAX_VALUE;
   private int maxTabletServersBatch = Integer.MIN_VALUE;
-  
+
   // error handling
   private final Violations violations = new Violations();
   private final Map<KeyExtent,Set<SecurityErrorCode>> authorizationFailures = new HashMap<KeyExtent,Set<SecurityErrorCode>>();
@@ -156,21 +156,21 @@ public class TabletServerBatchWriter {
     final long timeOut;
     long activityTime;
     Long firstErrorTime = null;
-    
+
     TimeoutTracker(String server, long timeOut) {
       this.timeOut = timeOut;
       this.server = server;
     }
-    
+
     void startingWrite() {
       activityTime = System.currentTimeMillis();
     }
-    
+
     void madeProgress() {
       activityTime = System.currentTimeMillis();
       firstErrorTime = null;
     }
-    
+
     void wroteNothing() {
       if (firstErrorTime == null) {
         firstErrorTime = activityTime;
@@ -178,16 +178,16 @@ public class TabletServerBatchWriter {
         throw new TimedOutException(Collections.singleton(server));
       }
     }
-    
+
     void errorOccured(Exception e) {
       wroteNothing();
     }
-    
+
     public long getTimeOut() {
       return timeOut;
     }
   }
-  
+
   public TabletServerBatchWriter(ClientContext context, BatchWriterConfig config) {
     this.context = context;
     this.maxMem = config.getMaxMemory();
@@ -196,9 +196,9 @@ public class TabletServerBatchWriter {
     this.mutations = new MutationSet();
     this.lastProcessingStartTime = System.currentTimeMillis();
     this.durability = config.getDurability();
-    
+
     this.writer = new MutationWriter(config.getMaxWriteThreads());
-    
+
     if (this.maxLatency != Long.MAX_VALUE) {
       jtimer.schedule(new TimerTask() {
         @Override
@@ -215,7 +215,7 @@ public class TabletServerBatchWriter {
       }, 0, this.maxLatency / 4);
     }
   }
-  
+
   private synchronized void startProcessing() {
     if (mutations.getMemoryUsed() == 0)
       return;
@@ -223,125 +223,125 @@ public class TabletServerBatchWriter {
     writer.addMutations(mutations);
     mutations = new MutationSet();
   }
-  
+
   private synchronized void decrementMemUsed(long amount) {
     totalMemUsed -= amount;
     this.notifyAll();
   }
-  
+
   public synchronized void addMutation(String table, Mutation m) throws MutationsRejectedException {
-    
+
     if (closed)
       throw new IllegalStateException("Closed");
     if (m.size() == 0)
       throw new IllegalArgumentException("Can not add empty mutations");
-    
+
     checkForFailures();
-    
+
     while ((totalMemUsed > maxMem || flushing) && !somethingFailed) {
       waitRTE();
     }
-    
+
     // do checks again since things could have changed while waiting and not holding lock
     if (closed)
       throw new IllegalStateException("Closed");
     checkForFailures();
-    
+
     if (startTime == 0) {
       startTime = System.currentTimeMillis();
-      
+
       List<GarbageCollectorMXBean> gcmBeans = ManagementFactory.getGarbageCollectorMXBeans();
       for (GarbageCollectorMXBean garbageCollectorMXBean : gcmBeans) {
         initialGCTimes += garbageCollectorMXBean.getCollectionTime();
       }
-      
+
       CompilationMXBean compMxBean = ManagementFactory.getCompilationMXBean();
       if (compMxBean.isCompilationTimeMonitoringSupported()) {
         initialCompileTimes = compMxBean.getTotalCompilationTime();
       }
-      
+
       initialSystemLoad = ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage();
     }
-    
+
     // create a copy of mutation so that after this method returns the user
     // is free to reuse the mutation object, like calling readFields... this
     // is important for the case where a mutation is passed from map to reduce
     // to batch writer... the map reduce code will keep passing the same mutation
     // object into the reduce method
     m = new Mutation(m);
-    
+
     totalMemUsed += m.estimatedMemoryUsed();
     mutations.addMutation(table, m);
     totalAdded++;
-    
+
     if (mutations.getMemoryUsed() >= maxMem / 2) {
       startProcessing();
       checkForFailures();
     }
   }
-  
+
   public void addMutation(String table, Iterator<Mutation> iterator) throws MutationsRejectedException {
     while (iterator.hasNext()) {
       addMutation(table, iterator.next());
     }
   }
-  
+
   public synchronized void flush() throws MutationsRejectedException {
-    
+
     if (closed)
       throw new IllegalStateException("Closed");
-    
+
     Span span = Trace.start("flush");
-    
+
     try {
       checkForFailures();
-      
+
       if (flushing) {
         // some other thread is currently flushing, so wait
         while (flushing && !somethingFailed)
           waitRTE();
-        
+
         checkForFailures();
-        
+
         return;
       }
-      
+
       flushing = true;
-      
+
       startProcessing();
       checkForFailures();
-      
+
       while (totalMemUsed > 0 && !somethingFailed) {
         waitRTE();
       }
-      
+
       flushing = false;
       this.notifyAll();
-      
+
       checkForFailures();
     } finally {
       span.stop();
       // somethingFailed = false;
     }
   }
-  
+
   public synchronized void close() throws MutationsRejectedException {
-    
+
     if (closed)
       return;
-    
+
     Span span = Trace.start("close");
     try {
       closed = true;
-      
+
       startProcessing();
-      
+
       while (totalMemUsed > 0 && !somethingFailed) {
         waitRTE();
       }
-      
+
       logStats();
-      
+
       checkForFailures();
     } finally {
       // make a best effort to release these resources
@@ -350,27 +350,27 @@ public class TabletServerBatchWriter {
       span.stop();
     }
   }
-  
+
   private void logStats() {
     long finishTime = System.currentTimeMillis();
-    
+
     long finalGCTimes = 0;
     List<GarbageCollectorMXBean> gcmBeans = ManagementFactory.getGarbageCollectorMXBeans();
     for (GarbageCollectorMXBean garbageCollectorMXBean : gcmBeans) {
       finalGCTimes += garbageCollectorMXBean.getCollectionTime();
     }
-    
+
     CompilationMXBean compMxBean = ManagementFactory.getCompilationMXBean();
     long finalCompileTimes = 0;
     if (compMxBean.isCompilationTimeMonitoringSupported()) {
       finalCompileTimes = compMxBean.getTotalCompilationTime();
     }
-    
+
     double averageRate = totalSent.get() / (totalSendTime.get() / 1000.0);
     double overallRate = totalAdded / ((finishTime - startTime) / 1000.0);
-    
+
     double finalSystemLoad = ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage();
-    
+
     if (log.isTraceEnabled()) {
       log.trace("");
       log.trace("TABLET SERVER BATCH WRITER STATISTICS");
@@ -400,39 +400,39 @@ public class TabletServerBatchWriter {
       log.trace(String.format("System load average : initial=%6.2f final=%6.2f", initialSystemLoad, finalSystemLoad));
     }
   }
-  
+
   private void updateSendStats(long count, long time) {
     totalSent.addAndGet(count);
     totalSendTime.addAndGet(time);
   }
-  
+
   public void updateBinningStats(int count, long time, Map<String,TabletServerMutations<Mutation>> binnedMutations) {
     totalBinTime.addAndGet(time);
     totalBinned.addAndGet(count);
     updateBatchStats(binnedMutations);
   }
-  
+
   private synchronized void updateBatchStats(Map<String,TabletServerMutations<Mutation>> binnedMutations) {
     tabletServersBatchSum += binnedMutations.size();
-    
+
     minTabletServersBatch = Math.min(minTabletServersBatch, binnedMutations.size());
     maxTabletServersBatch = Math.max(maxTabletServersBatch, binnedMutations.size());
-    
+
     int numTablets = 0;
-    
+
     for (Entry<String,TabletServerMutations<Mutation>> entry : binnedMutations.entrySet()) {
       TabletServerMutations<Mutation> tsm = entry.getValue();
       numTablets += tsm.getMutations().size();
     }
-    
+
     tabletBatchSum += numTablets;
-    
+
     minTabletBatch = Math.min(minTabletBatch, numTablets);
     maxTabletBatch = Math.max(maxTabletBatch, numTablets);
-    
+
     numBatches++;
   }
-  
+
   private void waitRTE() {
     try {
       wait();
@@ -440,9 +440,9 @@ public class TabletServerBatchWriter {
       throw new RuntimeException(e);
     }
   }
-  
+
   // BEGIN code for handling unrecoverable errors
-  
+
   private void updatedConstraintViolations(List<ConstraintViolationSummary> cvsList) {
     if (cvsList.size() > 0) {
       synchronized (this) {
@@ -452,28 +452,28 @@ public class TabletServerBatchWriter {
       }
     }
   }
-  
+
   private void updateAuthorizationFailures(Set<KeyExtent> keySet, SecurityErrorCode code) {
     HashMap<KeyExtent,SecurityErrorCode> map = new HashMap<KeyExtent,SecurityErrorCode>();
     for (KeyExtent ke : keySet)
       map.put(ke, code);
-    
+
     updateAuthorizationFailures(map);
   }
-  
+
   private void updateAuthorizationFailures(Map<KeyExtent,SecurityErrorCode> authorizationFailures) {
     if (authorizationFailures.size() > 0) {
-      
+
       // was a table deleted?
       HashSet<String> tableIds = new HashSet<String>();
       for (KeyExtent ke : authorizationFailures.keySet())
         tableIds.add(ke.getTableId().toString());
-      
+
       Tables.clearCache(context.getInstance());
       for (String tableId : tableIds)
         if (!Tables.exists(context.getInstance(), tableId))
           throw new TableDeletedException(tableId);
-      
+
       synchronized (this) {
         somethingFailed = true;
         mergeAuthorizationFailures(this.authorizationFailures, authorizationFailures);
@@ -481,7 +481,7 @@ public class TabletServerBatchWriter {
       }
     }
   }
-  
+
   private void mergeAuthorizationFailures(Map<KeyExtent,Set<SecurityErrorCode>> source, Map<KeyExtent,SecurityErrorCode> addition) {
     for (Entry<KeyExtent,SecurityErrorCode> entry : addition.entrySet()) {
       Set<SecurityErrorCode> secs = source.get(entry.getKey());
@@ -492,14 +492,14 @@ public class TabletServerBatchWriter {
       secs.add(entry.getValue());
     }
   }
-  
+
   private synchronized void updateServerErrors(String server, Exception e) {
     somethingFailed = true;
     this.serverSideErrors.add(server);
     this.notifyAll();
     log.error("Server side error on " + server + ": " + e);
   }
-  
+
   private synchronized void updateUnknownErrors(String msg, Throwable t) {
     somethingFailed = true;
     unknownErrors++;
@@ -510,29 +510,29 @@ public class TabletServerBatchWriter {
     else
       log.error(msg, t);
   }
-  
+
   private void checkForFailures() throws MutationsRejectedException {
     if (somethingFailed) {
       List<ConstraintViolationSummary> cvsList = violations.asList();
       HashMap<KeyExtent,Set<org.apache.accumulo.core.client.security.SecurityErrorCode>> af = new HashMap<KeyExtent,Set<org.apache.accumulo.core.client.security.SecurityErrorCode>>();
       for (Entry<KeyExtent,Set<SecurityErrorCode>> entry : authorizationFailures.entrySet()) {
         HashSet<org.apache.accumulo.core.client.security.SecurityErrorCode> codes = new HashSet<org.apache.accumulo.core.client.security.SecurityErrorCode>();
-        
+
         for (SecurityErrorCode sce : entry.getValue()) {
           codes.add(org.apache.accumulo.core.client.security.SecurityErrorCode.valueOf(sce.name()));
         }
-        
+
         af.put(entry.getKey(), codes);
       }
-      
+
       throw new MutationsRejectedException(context.getInstance(), cvsList, af, serverSideErrors, unknownErrors, lastUnknownError);
     }
   }
-  
+
   // END code for handling unrecoverable errors
-  
+
   // BEGIN code for handling failed mutations
-  
+
   /**
    * Add mutations that previously failed back into the mix
    */
@@ -542,16 +542,16 @@ public class TabletServerBatchWriter {
       startProcessing();
     }
   }
-  
+
   private class FailedMutations extends TimerTask {
-    
+
     private MutationSet recentFailures = null;
     private long initTime;
-    
+
     FailedMutations() {
       jtimer.schedule(this, 0, 500);
     }
-    
+
     private MutationSet init() {
       if (recentFailures == null) {
         recentFailures = new MutationSet();
@@ -559,35 +559,35 @@ public class TabletServerBatchWriter {
       }
       return recentFailures;
     }
-    
+
     synchronized void add(String table, ArrayList<Mutation> tableFailures) {
       init().addAll(table, tableFailures);
     }
-    
+
     synchronized void add(MutationSet failures) {
       init().addAll(failures);
     }
-    
+
     synchronized void add(String location, TabletServerMutations<Mutation> tsm) {
       init();
       for (Entry<KeyExtent,List<Mutation>> entry : tsm.getMutations().entrySet()) {
         recentFailures.addAll(entry.getKey().getTableId().toString(), entry.getValue());
       }
-      
+
     }
-    
+
     @Override
     public void run() {
       try {
         MutationSet rf = null;
-        
+
         synchronized (this) {
           if (recentFailures != null && System.currentTimeMillis() - initTime > 1000) {
             rf = recentFailures;
             recentFailures = null;
           }
         }
-        
+
         if (rf != null) {
           if (log.isTraceEnabled())
             log.trace("tid=" + Thread.currentThread().getId() + "  Requeuing " + rf.size() + " failed mutations");
@@ -599,26 +599,26 @@ public class TabletServerBatchWriter {
       }
     }
   }
-  
+
   // END code for handling failed mutations
-  
+
   // BEGIN code for sending mutations to tablet servers using background threads
-  
+
   private class MutationWriter {
-    
+
     private static final int MUTATION_BATCH_SIZE = 1 << 17;
     private final ExecutorService sendThreadPool;
     private final Map<String,TabletServerMutations<Mutation>> serversMutations;
     private final Set<String> queued;
     private final Map<String,TabletLocator> locators;
-    
+
     public MutationWriter(int numSendThreads) {
       serversMutations = new HashMap<String,TabletServerMutations<Mutation>>();
       queued = new HashSet<String>();
       sendThreadPool = new SimpleThreadPool(numSendThreads, this.getClass().getName());
       locators = new HashMap<String,TabletLocator>();
     }
-    
+
     private TabletLocator getLocator(String tableId) {
       TabletLocator ret = locators.get(tableId);
       if (ret == null) {
@@ -626,10 +626,10 @@ public class TabletServerBatchWriter {
         ret = new TimeoutTabletLocator(ret, timeout);
         locators.put(tableId, ret);
       }
-      
+
       return ret;
     }
-    
+
     private void binMutations(MutationSet mutationsToProcess, Map<String,TabletServerMutations<Mutation>> binnedMutations) {
       String tableId = null;
       try {
@@ -637,17 +637,17 @@ public class TabletServerBatchWriter {
         for (Entry<String,List<Mutation>> entry : es) {
           tableId = entry.getKey();
           TabletLocator locator = getLocator(tableId);
-          
+
           String table = entry.getKey();
           List<Mutation> tableMutations = entry.getValue();
-          
+
           if (tableMutations != null) {
             ArrayList<Mutation> tableFailures = new ArrayList<Mutation>();
             locator.binMutations(context, tableMutations, binnedMutations, tableFailures);
-            
+
             if (tableFailures.size() > 0) {
               failedMutations.add(table, tableFailures);
-              
+
               if (tableFailures.size() == tableMutations.size())
                 if (!Tables.exists(context.getInstance(), entry.getKey()))
                   throw new TableDeletedException(entry.getKey());
@@ -655,7 +655,7 @@ public class TabletServerBatchWriter {
                   throw new TableOfflineException(context.getInstance(), entry.getKey());
             }
           }
-          
+
         }
         return;
       } catch (AccumuloServerException ase) {
@@ -673,12 +673,12 @@ public class TabletServerBatchWriter {
       } catch (TableNotFoundException e) {
         updateUnknownErrors(e.getMessage(), e);
       }
-      
+
       // an error ocurred
       binnedMutations.clear();
-      
+
     }
-    
+
     void addMutations(MutationSet mutationsToSend) {
       Map<String,TabletServerMutations<Mutation>> binnedMutations = new HashMap<String,TabletServerMutations<Mutation>>();
       Span span = Trace.start("binMutations");
@@ -692,17 +692,17 @@ public class TabletServerBatchWriter {
       }
       addMutations(binnedMutations);
     }
-    
+
     private synchronized void addMutations(Map<String,TabletServerMutations<Mutation>> binnedMutations) {
-      
+
       int count = 0;
-      
+
       // merge mutations into existing mutations for a tablet server
       for (Entry<String,TabletServerMutations<Mutation>> entry : binnedMutations.entrySet()) {
         String server = entry.getKey();
-        
+
         TabletServerMutations<Mutation> currentMutations = serversMutations.get(server);
-        
+
         if (currentMutations == null) {
           serversMutations.put(server, entry.getValue());
         } else {
@@ -712,136 +712,136 @@ public class TabletServerBatchWriter {
             }
           }
         }
-        
+
         if (log.isTraceEnabled())
           for (Entry<KeyExtent,List<Mutation>> entry2 : entry.getValue().getMutations().entrySet())
             count += entry2.getValue().size();
-        
+
       }
-      
+
       if (count > 0 && log.isTraceEnabled())
         log.trace(String.format("Started sending %,d mutations to %,d tablet servers", count, binnedMutations.keySet().size()));
-      
+
       // randomize order of servers
       ArrayList<String> servers = new ArrayList<String>(binnedMutations.keySet());
       Collections.shuffle(servers);
-      
+
       for (String server : servers)
         if (!queued.contains(server)) {
           sendThreadPool.submit(Trace.wrap(new SendTask(server)));
           queued.add(server);
         }
     }
-    
+
     private synchronized TabletServerMutations<Mutation> getMutationsToSend(String server) {
       TabletServerMutations<Mutation> tsmuts = serversMutations.remove(server);
       if (tsmuts == null)
         queued.remove(server);
-      
+
       return tsmuts;
     }
-    
+
     class SendTask implements Runnable {
-      
+
       final private String location;
-      
+
       SendTask(String server) {
         this.location = server;
       }
-      
+
       @Override
       public void run() {
         try {
           TabletServerMutations<Mutation> tsmuts = getMutationsToSend(location);
-          
+
           while (tsmuts != null) {
             send(tsmuts);
             tsmuts = getMutationsToSend(location);
           }
-          
+
           return;
         } catch (Throwable t) {
           updateUnknownErrors("Failed to send tablet server " + location + " its batch : " + t.getMessage(), t);
         }
       }
-      
+
       public void send(TabletServerMutations<Mutation> tsm) throws AccumuloServerException, AccumuloSecurityException {
-        
+
         MutationSet failures = null;
-        
+
         String oldName = Thread.currentThread().getName();
-        
+
         Map<KeyExtent,List<Mutation>> mutationBatch = tsm.getMutations();
         try {
-          
+
           long count = 0;
           for (List<Mutation> list : mutationBatch.values()) {
             count += list.size();
           }
           String msg = "sending " + String.format("%,d", count) + " mutations to " + String.format("%,d", mutationBatch.size()) + " tablets at " + location;
           Thread.currentThread().setName(msg);
-          
+
           Span span = Trace.start("sendMutations");
           try {
-            
+
             TimeoutTracker timeoutTracker = timeoutTrackers.get(location);
             if (timeoutTracker == null) {
               timeoutTracker = new TimeoutTracker(location, timeout);
               timeoutTrackers.put(location, timeoutTracker);
             }
-            
+
             long st1 = System.currentTimeMillis();
             failures = sendMutationsToTabletServer(location, mutationBatch, timeoutTracker);
             long st2 = System.currentTimeMillis();
             if (log.isTraceEnabled())
               log.trace("sent " + String.format("%,d", count) + " mutations to " + location + " in "
                   + String.format("%.2f secs (%,.2f mutations/sec) with %,d failures", (st2 - st1) / 1000.0, count / ((st2 - st1) / 1000.0), failures.size()));
-            
+
             long successBytes = 0;
             for (Entry<KeyExtent,List<Mutation>> entry : mutationBatch.entrySet()) {
               for (Mutation mutation : entry.getValue()) {
                 successBytes += mutation.estimatedMemoryUsed();
               }
             }
-            
+
             if (failures.size() > 0) {
               failedMutations.add(failures);
               successBytes -= failures.getMemoryUsed();
             }
-            
+
             updateSendStats(count, st2 - st1);
             decrementMemUsed(successBytes);
-            
+
           } finally {
             span.stop();
           }
         } catch (IOException e) {
           if (log.isTraceEnabled())
             log.trace("failed to send mutations to " + location + " : " + e.getMessage());
-          
+
           HashSet<String> tables = new HashSet<String>();
           for (KeyExtent ke : mutationBatch.keySet())
             tables.add(ke.getTableId().toString());
-          
+
           for (String table : tables)
             TabletLocator.getLocator(context, new Text(table)).invalidateCache(context.getInstance(), location);
-          
+
           failedMutations.add(location, tsm);
         } finally {
           Thread.currentThread().setName(oldName);
         }
       }
     }
-    
+
     private MutationSet sendMutationsToTabletServer(String location, Map<KeyExtent,List<Mutation>> tabMuts, TimeoutTracker timeoutTracker) throws IOException,
         AccumuloSecurityException, AccumuloServerException {
       if (tabMuts.size() == 0) {
         return new MutationSet();
       }
       TInfo tinfo = Tracer.traceInfo();
-      
+
       timeoutTracker.startingWrite();
-      
+
       try {
         final HostAndPort parsedServer = HostAndPort.fromString(location);
         final TabletClientService.Iface client;
@@ -853,10 +853,10 @@ public class TabletServerBatchWriter {
 
         try {
           MutationSet allFailures = new MutationSet();
-          
+
           if (tabMuts.size() == 1 && tabMuts.values().iterator().next().size() == 1) {
             Entry<KeyExtent,List<Mutation>> entry = tabMuts.entrySet().iterator().next();
-            
+
             try {
               client.update(tinfo, context.rpcCreds(), entry.getKey().toThrift(), entry.getValue().get(0).toThrift(), DurabilityImpl.toThrift(durability));
             } catch (NotServingTabletException e) {
@@ -867,9 +867,9 @@ public class TabletServerBatchWriter {
             }
             timeoutTracker.madeProgress();
           } else {
-            
+
             long usid = client.startUpdate(tinfo, context.rpcCreds(), DurabilityImpl.toThrift(durability));
-            
+
             List<TMutation> updates = new ArrayList<TMutation>();
             for (Entry<KeyExtent,List<Mutation>> entry : tabMuts.entrySet()) {
               long size = 0;
@@ -880,34 +880,34 @@ public class TabletServerBatchWriter {
                   updates.add(mutation.toThrift());
                   size += mutation.numBytes();
                 }
-                
+
                 client.applyUpdates(tinfo, usid, entry.getKey().toThrift(), updates);
                 updates.clear();
                 size = 0;
               }
             }
-            
+
             UpdateErrors updateErrors = client.closeUpdate(tinfo, usid);
-            
+
             Map<KeyExtent,Long> failures = Translator.translate(updateErrors.failedExtents, Translators.TKET);
             updatedConstraintViolations(Translator.translate(updateErrors.violationSummaries, Translators.TCVST));
             updateAuthorizationFailures(Translator.translate(updateErrors.authorizationFailures, Translators.TKET));
-            
+
             long totalCommitted = 0;
-            
+
             for (Entry<KeyExtent,Long> entry : failures.entrySet()) {
               KeyExtent failedExtent = entry.getKey();
               int numCommitted = (int) (long) entry.getValue();
               totalCommitted += numCommitted;
-              
+
               String table = failedExtent.getTableId().toString();
-              
+
               TabletLocator.getLocator(context, new Text(table)).invalidateCache(failedExtent);
-              
+
               ArrayList<Mutation> mutations = (ArrayList<Mutation>) tabMuts.get(failedExtent);
               allFailures.addAll(table, mutations.subList(numCommitted, mutations.size()));
             }
-            
+
             if (failures.keySet().containsAll(tabMuts.keySet()) && totalCommitted == 0) {
               // nothing was successfully written
               timeoutTracker.wroteNothing();
@@ -936,34 +936,34 @@ public class TabletServerBatchWriter {
       }
     }
   }
-  
+
   // END code for sending mutations to tablet servers using background threads
-  
+
   private static class MutationSet {
-    
+
     private final HashMap<String,List<Mutation>> mutations;
     private int memoryUsed = 0;
-    
+
     MutationSet() {
       mutations = new HashMap<String,List<Mutation>>();
     }
-    
+
     void addMutation(String table, Mutation mutation) {
       List<Mutation> tabMutList = mutations.get(table);
       if (tabMutList == null) {
         tabMutList = new ArrayList<Mutation>();
         mutations.put(table, tabMutList);
       }
-      
+
       tabMutList.add(mutation);
-      
+
       memoryUsed += mutation.estimatedMemoryUsed();
     }
-    
+
     Map<String,List<Mutation>> getMutations() {
       return mutations;
     }
-    
+
     int size() {
       int result = 0;
       for (List<Mutation> perTable : mutations.values()) {
@@ -971,28 +971,28 @@ public class TabletServerBatchWriter {
       }
       return result;
     }
-    
+
     public void addAll(MutationSet failures) {
       Set<Entry<String,List<Mutation>>> es = failures.getMutations().entrySet();
-      
+
       for (Entry<String,List<Mutation>> entry : es) {
         String table = entry.getKey();
-        
+
         for (Mutation mutation : entry.getValue()) {
           addMutation(table, mutation);
         }
       }
     }
-    
+
     public void addAll(String table, List<Mutation> mutations) {
       for (Mutation mutation : mutations) {
         addMutation(table, mutation);
       }
     }
-    
+
     public int getMemoryUsed() {
       return memoryUsed;
     }
-    
+
   }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6bc67602/core/src/main/java/org/apache/accumulo/core/client/impl/TabletType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletType.java b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletType.java
index 3adcca9..d57bf94 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletType.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletType.java
@@ -22,7 +22,7 @@ import org.apache.accumulo.core.data.KeyExtent;
 
 public enum TabletType {
   ROOT, METADATA, USER;
-  
+
   public static TabletType type(KeyExtent ke) {
     if (ke.isRootTablet())
       return ROOT;
@@ -30,20 +30,20 @@ public enum TabletType {
       return METADATA;
     return USER;
   }
-  
+
   public static TabletType type(Collection<KeyExtent> extents) {
     if (extents.size() == 0)
       throw new IllegalArgumentException();
-    
+
     TabletType ttype = null;
-    
+
     for (KeyExtent extent : extents) {
       if (ttype == null)
         ttype = type(extent);
       else if (ttype != type(extent))
         throw new IllegalArgumentException("multiple extent types not allowed " + ttype + " " + type(extent));
     }
-    
+
     return ttype;
   }
 }


Mime
View raw message