accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject [5/6] git commit: ACCUMULO-2408 organize static final members of abstract class to a concrete implementation
Date Wed, 26 Feb 2014 19:40:39 GMT
ACCUMULO-2408 organize static final members of abstract class to a concrete implementation


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/0bd62e39
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/0bd62e39
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/0bd62e39

Branch: refs/heads/1.5.1-SNAPSHOT
Commit: 0bd62e390b737d66b2734d3890dbd4f3ca846589
Parents: 6258018 57b2e5c
Author: Eric Newton <eric.newton@gmail.com>
Authored: Wed Feb 26 14:28:24 2014 -0500
Committer: Eric Newton <eric.newton@gmail.com>
Committed: Wed Feb 26 14:28:24 2014 -0500

----------------------------------------------------------------------
 .../impl/TabletServerBatchReaderIterator.java   | 12 +++----
 .../client/impl/TabletServerBatchWriter.java    |  8 ++---
 .../core/client/impl/ThriftScanner.java         |  4 +--
 .../accumulo/core/client/impl/Translator.java   |  9 -----
 .../accumulo/core/client/impl/Translators.java  | 37 ++++++++++++++++++++
 .../accumulo/server/client/BulkImporter.java    |  5 +--
 .../server/tabletserver/TabletServer.java       | 17 ++++-----
 .../mastermessage/SplitReportMessage.java       |  3 +-
 8 files changed, 63 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/0bd62e39/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
index 6c6d1bb,0000000..990a577
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
@@@ -1,734 -1,0 +1,734 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.client.impl;
 +
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.ListIterator;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.NoSuchElementException;
 +import java.util.Set;
 +import java.util.concurrent.ArrayBlockingQueue;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.Semaphore;
 +import java.util.concurrent.TimeUnit;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.AccumuloException;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.Instance;
 +import org.apache.accumulo.core.client.TableDeletedException;
 +import org.apache.accumulo.core.client.TableNotFoundException;
 +import org.apache.accumulo.core.client.TableOfflineException;
 +import org.apache.accumulo.core.client.TimedOutException;
 +import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.Column;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.KeyExtent;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.data.thrift.InitialMultiScan;
 +import org.apache.accumulo.core.data.thrift.MultiScanResult;
 +import org.apache.accumulo.core.data.thrift.TKeyExtent;
 +import org.apache.accumulo.core.data.thrift.TKeyValue;
 +import org.apache.accumulo.core.data.thrift.TRange;
 +import org.apache.accumulo.core.master.state.tables.TableState;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.core.security.thrift.TCredentials;
 +import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
 +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
 +import org.apache.accumulo.core.util.ByteBufferUtil;
 +import org.apache.accumulo.core.util.OpTimer;
 +import org.apache.accumulo.core.util.ThriftUtil;
 +import org.apache.accumulo.trace.instrument.TraceRunnable;
 +import org.apache.accumulo.trace.instrument.Tracer;
 +import org.apache.hadoop.io.Text;
 +import org.apache.log4j.Level;
 +import org.apache.log4j.Logger;
 +import org.apache.thrift.TApplicationException;
 +import org.apache.thrift.TException;
 +import org.apache.thrift.transport.TTransport;
 +import org.apache.thrift.transport.TTransportException;
 +
 +public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value>> {
 +  
 +  private static final Logger log = Logger.getLogger(TabletServerBatchReaderIterator.class);
 +  
 +  private final Instance instance;
 +  private final TCredentials credentials;
 +  private final String table;
 +  private Authorizations authorizations = Constants.NO_AUTHS;
 +  private final int numThreads;
 +  private final ExecutorService queryThreadPool;
 +  private final ScannerOptions options;
 +  
 +  private ArrayBlockingQueue<List<Entry<Key,Value>>> resultsQueue;
 +  private Iterator<Entry<Key,Value>> batchIterator;
 +  private List<Entry<Key,Value>> batch;
 +  private static final List<Entry<Key,Value>> LAST_BATCH = new ArrayList<Map.Entry<Key,Value>>();
 +  private final Object nextLock = new Object();
 +  
 +  private long failSleepTime = 100;
 +  
 +  private volatile Throwable fatalException = null;
 +  
 +  private Map<String,TimeoutTracker> timeoutTrackers;
 +  private Set<String> timedoutServers;
 +  private long timeout;
 +  
 +  private TabletLocator locator;
 +  
 +  public interface ResultReceiver {
 +    void receive(List<Entry<Key,Value>> entries);
 +  }
 +  
 +  private static class MyEntry implements Entry<Key,Value> {
 +    
 +    private Key key;
 +    private Value value;
 +    
 +    MyEntry(Key key, Value value) {
 +      this.key = key;
 +      this.value = value;
 +    }
 +    
 +    @Override
 +    public Key getKey() {
 +      return key;
 +    }
 +    
 +    @Override
 +    public Value getValue() {
 +      return value;
 +    }
 +    
 +    @Override
 +    public Value setValue(Value value) {
 +      throw new UnsupportedOperationException();
 +    }
 +    
 +  }
 +  
 +  public TabletServerBatchReaderIterator(Instance instance, TCredentials credentials, String table, Authorizations authorizations, ArrayList<Range> ranges,
 +      int numThreads, ExecutorService queryThreadPool, ScannerOptions scannerOptions, long timeout) {
 +    
 +    this.instance = instance;
 +    this.credentials = credentials;
 +    this.table = table;
 +    this.authorizations = authorizations;
 +    this.numThreads = numThreads;
 +    this.queryThreadPool = queryThreadPool;
 +    this.options = new ScannerOptions(scannerOptions);
 +    resultsQueue = new ArrayBlockingQueue<List<Entry<Key,Value>>>(numThreads);
 +    
 +    this.locator = new TimeoutTabletLocator(TabletLocator.getInstance(instance, new Text(table)), timeout);
 +    
 +    timeoutTrackers = Collections.synchronizedMap(new HashMap<String,TabletServerBatchReaderIterator.TimeoutTracker>());
 +    timedoutServers = Collections.synchronizedSet(new HashSet<String>());
 +    this.timeout = timeout;
 +    
 +    if (options.fetchedColumns.size() > 0) {
 +      ArrayList<Range> ranges2 = new ArrayList<Range>(ranges.size());
 +      for (Range range : ranges) {
 +        ranges2.add(range.bound(options.fetchedColumns.first(), options.fetchedColumns.last()));
 +      }
 +      
 +      ranges = ranges2;
 +    }
 +    
 +    ResultReceiver rr = new ResultReceiver() {
 +      
 +      @Override
 +      public void receive(List<Entry<Key,Value>> entries) {
 +        try {
 +          resultsQueue.put(entries);
 +        } catch (InterruptedException e) {
 +          if (TabletServerBatchReaderIterator.this.queryThreadPool.isShutdown())
 +            log.debug("Failed to add Batch Scan result", e);
 +          else
 +            log.warn("Failed to add Batch Scan result", e);
 +          fatalException = e;
 +          throw new RuntimeException(e);
 +          
 +        }
 +      }
 +      
 +    };
 +    
 +    try {
 +      lookup(ranges, rr);
 +    } catch (RuntimeException re) {
 +      throw re;
 +    } catch (Exception e) {
 +      throw new RuntimeException("Failed to create iterator", e);
 +    }
 +  }
 +  
 +  @Override
 +  public boolean hasNext() {
 +    synchronized (nextLock) {
 +      if (batch == LAST_BATCH)
 +        return false;
 +      
 +      if (batch != null && batchIterator.hasNext())
 +        return true;
 +      
 +      // don't have one cached, try to cache one and return success
 +      try {
 +        batch = null;
 +        while (batch == null && fatalException == null && !queryThreadPool.isShutdown())
 +          batch = resultsQueue.poll(1, TimeUnit.SECONDS);
 +        
 +        if (fatalException != null)
 +          if (fatalException instanceof RuntimeException)
 +            throw (RuntimeException) fatalException;
 +          else
 +            throw new RuntimeException(fatalException);
 +        
 +        if (queryThreadPool.isShutdown())
 +          throw new RuntimeException("scanner closed");
 +        
 +        batchIterator = batch.iterator();
 +        return batch != LAST_BATCH;
 +      } catch (InterruptedException e) {
 +        throw new RuntimeException(e);
 +      }
 +    }
 +  }
 +  
 +  @Override
 +  public Entry<Key,Value> next() {
 +    // if there's one waiting, or hasNext() can get one, return it
 +    synchronized (nextLock) {
 +      if (hasNext())
 +        return batchIterator.next();
 +      else
 +        throw new NoSuchElementException();
 +    }
 +  }
 +  
 +  @Override
 +  public void remove() {
 +    throw new UnsupportedOperationException();
 +  }
 +  
 +  private synchronized void lookup(List<Range> ranges, ResultReceiver receiver) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
 +    List<Column> columns = new ArrayList<Column>(options.fetchedColumns);
 +    ranges = Range.mergeOverlapping(ranges);
 +    
 +    Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
 +    
 +    binRanges(locator, ranges, binnedRanges);
 +    
 +    doLookups(binnedRanges, receiver, columns);
 +  }
 +  
 +  private void binRanges(TabletLocator tabletLocator, List<Range> ranges, Map<String,Map<KeyExtent,List<Range>>> binnedRanges) throws AccumuloException,
 +      AccumuloSecurityException, TableNotFoundException {
 +    
 +    int lastFailureSize = Integer.MAX_VALUE;
 +    
 +    while (true) {
 +      
 +      binnedRanges.clear();
 +      List<Range> failures = tabletLocator.binRanges(ranges, binnedRanges, credentials);
 +      
 +      if (failures.size() > 0) {
 +        // tried to only do table state checks when failures.size() == ranges.size(), however this did
 +        // not work because nothing ever invalidated entries in the tabletLocator cache... so even though
 +        // the table was deleted the tablet locator entries for the deleted table were not cleared... so
 +        // need to always do the check when failures occur
 +        if (failures.size() >= lastFailureSize)
 +          if (!Tables.exists(instance, table))
 +            throw new TableDeletedException(table);
 +          else if (Tables.getTableState(instance, table) == TableState.OFFLINE)
 +            throw new TableOfflineException(instance, table);
 +        
 +        lastFailureSize = failures.size();
 +        
 +        if (log.isTraceEnabled())
 +          log.trace("Failed to bin " + failures.size() + " ranges, tablet locations were null, retrying in 100ms");
 +        try {
 +          Thread.sleep(100);
 +        } catch (InterruptedException e) {
 +          throw new RuntimeException(e);
 +        }
 +      } else {
 +        break;
 +      }
 +      
 +    }
 +    
 +    // truncate the ranges to within the tablets... this makes it easier to know what work
 +    // needs to be redone when failures occurs and tablets have merged or split
 +    Map<String,Map<KeyExtent,List<Range>>> binnedRanges2 = new HashMap<String,Map<KeyExtent,List<Range>>>();
 +    for (Entry<String,Map<KeyExtent,List<Range>>> entry : binnedRanges.entrySet()) {
 +      Map<KeyExtent,List<Range>> tabletMap = new HashMap<KeyExtent,List<Range>>();
 +      binnedRanges2.put(entry.getKey(), tabletMap);
 +      for (Entry<KeyExtent,List<Range>> tabletRanges : entry.getValue().entrySet()) {
 +        Range tabletRange = tabletRanges.getKey().toDataRange();
 +        List<Range> clippedRanges = new ArrayList<Range>();
 +        tabletMap.put(tabletRanges.getKey(), clippedRanges);
 +        for (Range range : tabletRanges.getValue())
 +          clippedRanges.add(tabletRange.clip(range));
 +      }
 +    }
 +    
 +    binnedRanges.clear();
 +    binnedRanges.putAll(binnedRanges2);
 +  }
 +  
 +  private void processFailures(Map<KeyExtent,List<Range>> failures, ResultReceiver receiver, List<Column> columns) throws AccumuloException,
 +      AccumuloSecurityException, TableNotFoundException {
 +    if (log.isTraceEnabled())
 +      log.trace("Failed to execute multiscans against " + failures.size() + " tablets, retrying...");
 +    
 +    try {
 +      Thread.sleep(failSleepTime);
 +    } catch (InterruptedException e) {
 +      Thread.currentThread().interrupt();
 +      
 +      // We were interrupted (close called on batchscanner) just exit
 +      log.debug("Exiting failure processing on interrupt");
 +      return;
 +    }
 +
 +    failSleepTime = Math.min(5000, failSleepTime * 2);
 +    
 +    Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
 +    List<Range> allRanges = new ArrayList<Range>();
 +    
 +    for (List<Range> ranges : failures.values())
 +      allRanges.addAll(ranges);
 +    
 +    // since the first call to binRanges clipped the ranges to within a tablet, we should not get only
 +    // bin to the set of failed tablets
 +    binRanges(locator, allRanges, binnedRanges);
 +    
 +    doLookups(binnedRanges, receiver, columns);
 +  }
 +  
 +  private class QueryTask implements Runnable {
 +    
 +    private String tsLocation;
 +    private Map<KeyExtent,List<Range>> tabletsRanges;
 +    private ResultReceiver receiver;
 +    private Semaphore semaphore = null;
 +    private final Map<KeyExtent,List<Range>> failures;
 +    private List<Column> columns;
 +    private int semaphoreSize;
 +    
 +    QueryTask(String tsLocation, Map<KeyExtent,List<Range>> tabletsRanges, Map<KeyExtent,List<Range>> failures, ResultReceiver receiver, List<Column> columns) {
 +      this.tsLocation = tsLocation;
 +      this.tabletsRanges = tabletsRanges;
 +      this.receiver = receiver;
 +      this.columns = columns;
 +      this.failures = failures;
 +    }
 +    
 +    void setSemaphore(Semaphore semaphore, int semaphoreSize) {
 +      this.semaphore = semaphore;
 +      this.semaphoreSize = semaphoreSize;
 +    }
 +    
 +    @Override
 +    public void run() {
 +      String threadName = Thread.currentThread().getName();
 +      Thread.currentThread().setName(threadName + " looking up " + tabletsRanges.size() + " ranges at " + tsLocation);
 +      Map<KeyExtent,List<Range>> unscanned = new HashMap<KeyExtent,List<Range>>();
 +      Map<KeyExtent,List<Range>> tsFailures = new HashMap<KeyExtent,List<Range>>();
 +      try {
 +        TimeoutTracker timeoutTracker = timeoutTrackers.get(tsLocation);
 +        if (timeoutTracker == null) {
 +          timeoutTracker = new TimeoutTracker(tsLocation, timedoutServers, timeout);
 +          timeoutTrackers.put(tsLocation, timeoutTracker);
 +        }
 +        doLookup(tsLocation, tabletsRanges, tsFailures, unscanned, receiver, columns, credentials, options, authorizations, instance.getConfiguration(),
 +            timeoutTracker);
 +        if (tsFailures.size() > 0) {
 +          locator.invalidateCache(tsFailures.keySet());
 +          synchronized (failures) {
 +            failures.putAll(tsFailures);
 +          }
 +        }
 +        
 +      } catch (IOException e) {
 +        synchronized (failures) {
 +          failures.putAll(tsFailures);
 +          failures.putAll(unscanned);
 +        }
 +        
 +        locator.invalidateCache(tsLocation);
 +        log.debug(e.getMessage(), e);
 +      } catch (AccumuloSecurityException e) {
 +        log.debug(e.getMessage(), e);
 +        
 +        Tables.clearCache(instance);
 +        if (!Tables.exists(instance, table))
 +          fatalException = new TableDeletedException(table);
 +        else
 +          fatalException = e;
 +      } catch (Throwable t) {
 +        if (queryThreadPool.isShutdown())
 +          log.debug(t.getMessage(), t);
 +        else
 +          log.warn(t.getMessage(), t);
 +        fatalException = t;
 +      } finally {
 +        semaphore.release();
 +        Thread.currentThread().setName(threadName);
 +        if (semaphore.tryAcquire(semaphoreSize)) {
 +          // finished processing all queries
 +          if (fatalException == null && failures.size() > 0) {
 +            // there were some failures
 +            try {
 +              processFailures(failures, receiver, columns);
 +            } catch (TableNotFoundException e) {
 +              log.debug(e.getMessage(), e);
 +              fatalException = e;
 +            } catch (AccumuloException e) {
 +              log.debug(e.getMessage(), e);
 +              fatalException = e;
 +            } catch (AccumuloSecurityException e) {
 +              log.debug(e.getMessage(), e);
 +              fatalException = e;
 +            } catch (Throwable t) {
 +              log.debug(t.getMessage(), t);
 +              fatalException = t;
 +            }
 +            
 +            if (fatalException != null) {
 +              // we are finished with this batch query
 +              if (!resultsQueue.offer(LAST_BATCH)) {
 +                log.debug("Could not add to result queue after seeing fatalException in processFailures", fatalException);
 +              }
 +            }
 +          } else {
 +            // we are finished with this batch query
 +            if (fatalException != null) {
 +              if (!resultsQueue.offer(LAST_BATCH)) {
 +                log.debug("Could not add to result queue after seeing fatalException", fatalException);
 +              }
 +            } else {
 +              try {
 +                resultsQueue.put(LAST_BATCH);
 +              } catch (InterruptedException e) {
 +                fatalException = e;
 +                if (!resultsQueue.offer(LAST_BATCH)) {
 +                  log.debug("Could not add to result queue after seeing fatalException", fatalException);
 +                }
 +              }
 +            }
 +          }
 +        }
 +      }
 +    }
 +    
 +  }
 +  
 +  private void doLookups(Map<String,Map<KeyExtent,List<Range>>> binnedRanges, final ResultReceiver receiver, List<Column> columns) {
 +    
 +    if (timedoutServers.containsAll(binnedRanges.keySet())) {
 +      // all servers have timed out
 +      throw new TimedOutException(timedoutServers);
 +    }
 +    
 +    // when there are lots of threads and a few tablet servers
 +    // it is good to break request to tablet servers up, the
 +    // following code determines if this is the case
 +    int maxTabletsPerRequest = Integer.MAX_VALUE;
 +    if (numThreads / binnedRanges.size() > 1) {
 +      int totalNumberOfTablets = 0;
 +      for (Entry<String,Map<KeyExtent,List<Range>>> entry : binnedRanges.entrySet()) {
 +        totalNumberOfTablets += entry.getValue().size();
 +      }
 +      
 +      maxTabletsPerRequest = totalNumberOfTablets / numThreads;
 +      if (maxTabletsPerRequest == 0) {
 +        maxTabletsPerRequest = 1;
 +      }
 +      
 +    }
 +    
 +    Map<KeyExtent,List<Range>> failures = new HashMap<KeyExtent,List<Range>>();
 +    
 +    if (timedoutServers.size() > 0) {
 +      // go ahead and fail any timed out servers
 +      for (Iterator<Entry<String,Map<KeyExtent,List<Range>>>> iterator = binnedRanges.entrySet().iterator(); iterator.hasNext();) {
 +        Entry<String,Map<KeyExtent,List<Range>>> entry = iterator.next();
 +        if (timedoutServers.contains(entry.getKey())) {
 +          failures.putAll(entry.getValue());
 +          iterator.remove();
 +        }
 +      }
 +    }
 +    
 +    // randomize tabletserver order... this will help when there are multiple
 +    // batch readers and writers running against accumulo
 +    List<String> locations = new ArrayList<String>(binnedRanges.keySet());
 +    Collections.shuffle(locations);
 +    
 +    List<QueryTask> queryTasks = new ArrayList<QueryTask>();
 +    
 +    for (final String tsLocation : locations) {
 +      
 +      final Map<KeyExtent,List<Range>> tabletsRanges = binnedRanges.get(tsLocation);
 +      if (maxTabletsPerRequest == Integer.MAX_VALUE || tabletsRanges.size() == 1) {
 +        QueryTask queryTask = new QueryTask(tsLocation, tabletsRanges, failures, receiver, columns);
 +        queryTasks.add(queryTask);
 +      } else {
 +        HashMap<KeyExtent,List<Range>> tabletSubset = new HashMap<KeyExtent,List<Range>>();
 +        for (Entry<KeyExtent,List<Range>> entry : tabletsRanges.entrySet()) {
 +          tabletSubset.put(entry.getKey(), entry.getValue());
 +          if (tabletSubset.size() >= maxTabletsPerRequest) {
 +            QueryTask queryTask = new QueryTask(tsLocation, tabletSubset, failures, receiver, columns);
 +            queryTasks.add(queryTask);
 +            tabletSubset = new HashMap<KeyExtent,List<Range>>();
 +          }
 +        }
 +        
 +        if (tabletSubset.size() > 0) {
 +          QueryTask queryTask = new QueryTask(tsLocation, tabletSubset, failures, receiver, columns);
 +          queryTasks.add(queryTask);
 +        }
 +      }
 +    }
 +    
 +    final Semaphore semaphore = new Semaphore(queryTasks.size());
 +    semaphore.acquireUninterruptibly(queryTasks.size());
 +    
 +    for (QueryTask queryTask : queryTasks) {
 +      queryTask.setSemaphore(semaphore, queryTasks.size());
 +      queryThreadPool.execute(new TraceRunnable(queryTask));
 +    }
 +  }
 +  
 +  static void trackScanning(Map<KeyExtent,List<Range>> failures, Map<KeyExtent,List<Range>> unscanned, MultiScanResult scanResult) {
 +    
 +    // translate returned failures, remove them from unscanned, and add them to failures
-     Map<KeyExtent,List<Range>> retFailures = Translator.translate(scanResult.failures, Translator.TKET, new Translator.ListTranslator<TRange,Range>(
-         Translator.TRT));
++    Map<KeyExtent,List<Range>> retFailures = Translator.translate(scanResult.failures, Translators.TKET, new Translator.ListTranslator<TRange,Range>(
++        Translators.TRT));
 +    unscanned.keySet().removeAll(retFailures.keySet());
 +    failures.putAll(retFailures);
 +    
 +    // translate full scans and remove them from unscanned
-     HashSet<KeyExtent> fullScans = new HashSet<KeyExtent>(Translator.translate(scanResult.fullScans, Translator.TKET));
++    HashSet<KeyExtent> fullScans = new HashSet<KeyExtent>(Translator.translate(scanResult.fullScans, Translators.TKET));
 +    unscanned.keySet().removeAll(fullScans);
 +    
 +    // remove partial scan from unscanned
 +    if (scanResult.partScan != null) {
 +      KeyExtent ke = new KeyExtent(scanResult.partScan);
 +      Key nextKey = new Key(scanResult.partNextKey);
 +      
 +      ListIterator<Range> iterator = unscanned.get(ke).listIterator();
 +      while (iterator.hasNext()) {
 +        Range range = iterator.next();
 +        
 +        if (range.afterEndKey(nextKey) || (nextKey.equals(range.getEndKey()) && scanResult.partNextKeyInclusive != range.isEndKeyInclusive())) {
 +          iterator.remove();
 +        } else if (range.contains(nextKey)) {
 +          iterator.remove();
 +          Range partRange = new Range(nextKey, scanResult.partNextKeyInclusive, range.getEndKey(), range.isEndKeyInclusive());
 +          iterator.add(partRange);
 +        }
 +      }
 +    }
 +  }
 +  
 +  private static class TimeoutTracker {
 +    
 +    String server;
 +    Set<String> badServers;
 +    long timeOut;
 +    long activityTime;
 +    Long firstErrorTime = null;
 +    
 +    TimeoutTracker(String server, Set<String> badServers, long timeOut) {
 +      this(timeOut);
 +      this.server = server;
 +      this.badServers = badServers;
 +    }
 +    
 +    TimeoutTracker(long timeOut) {
 +      this.timeOut = timeOut;
 +    }
 +    
 +    void startingScan() {
 +      activityTime = System.currentTimeMillis();
 +    }
 +    
 +    void check() throws IOException {
 +      if (System.currentTimeMillis() - activityTime > timeOut) {
 +        badServers.add(server);
 +        throw new IOException("Time exceeded " + (System.currentTimeMillis() - activityTime) + " " + server);
 +      }
 +    }
 +    
 +    void madeProgress() {
 +      activityTime = System.currentTimeMillis();
 +      firstErrorTime = null;
 +    }
 +    
 +    void errorOccured(Exception e) {
 +      if (firstErrorTime == null) {
 +        firstErrorTime = activityTime;
 +      } else if (System.currentTimeMillis() - firstErrorTime > timeOut) {
 +        badServers.add(server);
 +      }
 +    }
 +    
 +    /**
 +     */
 +    public long getTimeOut() {
 +      return timeOut;
 +    }
 +  }
 +  
 +  static void doLookup(String server, Map<KeyExtent,List<Range>> requested, Map<KeyExtent,List<Range>> failures, Map<KeyExtent,List<Range>> unscanned,
 +      ResultReceiver receiver, List<Column> columns, TCredentials credentials, ScannerOptions options, Authorizations authorizations, AccumuloConfiguration conf)
 +      throws IOException, AccumuloSecurityException, AccumuloServerException {
 +    doLookup(server, requested, failures, unscanned, receiver, columns, credentials, options, authorizations, conf, new TimeoutTracker(Long.MAX_VALUE));
 +  }
 +  
 +  static void doLookup(String server, Map<KeyExtent,List<Range>> requested, Map<KeyExtent,List<Range>> failures, Map<KeyExtent,List<Range>> unscanned,
 +      ResultReceiver receiver, List<Column> columns, TCredentials credentials, ScannerOptions options, Authorizations authorizations, AccumuloConfiguration conf,
 +      TimeoutTracker timeoutTracker) throws IOException, AccumuloSecurityException, AccumuloServerException {
 +    
 +    if (requested.size() == 0) {
 +      return;
 +    }
 +    
 +    // copy requested to unscanned map. we will remove ranges as they are scanned in trackScanning()
 +    for (Entry<KeyExtent,List<Range>> entry : requested.entrySet()) {
 +      ArrayList<Range> ranges = new ArrayList<Range>();
 +      for (Range range : entry.getValue()) {
 +        ranges.add(new Range(range));
 +      }
 +      unscanned.put(new KeyExtent(entry.getKey()), ranges);
 +    }
 +    
 +    timeoutTracker.startingScan();
 +    TTransport transport = null;
 +    try {
 +      TabletClientService.Client client;
 +      if (timeoutTracker.getTimeOut() < conf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT))
 +        client = ThriftUtil.getTServerClient(server, conf, timeoutTracker.getTimeOut());
 +      else
 +        client = ThriftUtil.getTServerClient(server, conf);
 +      
 +      try {
 +        
 +        OpTimer opTimer = new OpTimer(log, Level.TRACE).start("Starting multi scan, tserver=" + server + "  #tablets=" + requested.size() + "  #ranges="
 +            + sumSizes(requested.values()) + " ssil=" + options.serverSideIteratorList + " ssio=" + options.serverSideIteratorOptions);
 +        
 +        TabletType ttype = TabletType.type(requested.keySet());
 +        boolean waitForWrites = !ThriftScanner.serversWaitedForWrites.get(ttype).contains(server);
 +        
-         Map<TKeyExtent,List<TRange>> thriftTabletRanges = Translator.translate(requested, Translator.KET, new Translator.ListTranslator<Range,TRange>(
-             Translator.RT));
-         InitialMultiScan imsr = client.startMultiScan(Tracer.traceInfo(), credentials, thriftTabletRanges, Translator.translate(columns, Translator.CT),
++        Map<TKeyExtent,List<TRange>> thriftTabletRanges = Translator.translate(requested, Translators.KET, new Translator.ListTranslator<Range,TRange>(
++            Translators.RT));
++        InitialMultiScan imsr = client.startMultiScan(Tracer.traceInfo(), credentials, thriftTabletRanges, Translator.translate(columns, Translators.CT),
 +            options.serverSideIteratorList, options.serverSideIteratorOptions, ByteBufferUtil.toByteBuffers(authorizations.getAuthorizations()), waitForWrites);
 +        if (waitForWrites)
 +          ThriftScanner.serversWaitedForWrites.get(ttype).add(server);
 +        
 +        MultiScanResult scanResult = imsr.result;
 +        
 +        opTimer.stop("Got 1st multi scan results, #results=" + scanResult.results.size() + (scanResult.more ? "  scanID=" + imsr.scanID : "")
 +            + " in %DURATION%");
 +        
 +        ArrayList<Entry<Key,Value>> entries = new ArrayList<Map.Entry<Key,Value>>(scanResult.results.size());
 +        for (TKeyValue kv : scanResult.results) {
 +          entries.add(new MyEntry(new Key(kv.key), new Value(kv.value)));
 +        }
 +        
 +        if (entries.size() > 0)
 +          receiver.receive(entries);
 +        
 +        if (entries.size() > 0 || scanResult.fullScans.size() > 0)
 +          timeoutTracker.madeProgress();
 +        
 +        trackScanning(failures, unscanned, scanResult);
 +        
 +        while (scanResult.more) {
 +          
 +          timeoutTracker.check();
 +          
 +          opTimer.start("Continuing multi scan, scanid=" + imsr.scanID);
 +          scanResult = client.continueMultiScan(Tracer.traceInfo(), imsr.scanID);
 +          opTimer.stop("Got more multi scan results, #results=" + scanResult.results.size() + (scanResult.more ? "  scanID=" + imsr.scanID : "")
 +              + " in %DURATION%");
 +          
 +          entries = new ArrayList<Map.Entry<Key,Value>>(scanResult.results.size());
 +          for (TKeyValue kv : scanResult.results) {
 +            entries.add(new MyEntry(new Key(kv.key), new Value(kv.value)));
 +          }
 +          
 +          if (entries.size() > 0)
 +            receiver.receive(entries);
 +          
 +          if (entries.size() > 0 || scanResult.fullScans.size() > 0)
 +            timeoutTracker.madeProgress();
 +          
 +          trackScanning(failures, unscanned, scanResult);
 +        }
 +        
 +        client.closeMultiScan(Tracer.traceInfo(), imsr.scanID);
 +        
 +      } finally {
 +        ThriftUtil.returnClient(client);
 +      }
 +    } catch (TTransportException e) {
 +      log.debug("Server : " + server + " msg : " + e.getMessage());
 +      timeoutTracker.errorOccured(e);
 +      throw new IOException(e);
 +    } catch (ThriftSecurityException e) {
 +      log.debug("Server : " + server + " msg : " + e.getMessage(), e);
 +      throw new AccumuloSecurityException(e.user, e.code, e);
 +    } catch (TApplicationException e) {
 +      log.debug("Server : " + server + " msg : " + e.getMessage(), e);
 +      throw new AccumuloServerException(server, e);
 +    } catch (NoSuchScanIDException e) {
 +      log.debug("Server : " + server + " msg : " + e.getMessage(), e);
 +      throw new IOException(e);
 +    } catch (TException e) {
 +      log.debug("Server : " + server + " msg : " + e.getMessage(), e);
 +      timeoutTracker.errorOccured(e);
 +      throw new IOException(e);
 +    } finally {
 +      ThriftTransportPool.getInstance().returnTransport(transport);
 +    }
 +  }
 +  
 +  static int sumSizes(Collection<List<Range>> values) {
 +    int sum = 0;
 +    
 +    for (List<Range> list : values) {
 +      sum += list.size();
 +    }
 +    
 +    return sum;
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0bd62e39/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
index ca182a6,0000000..5b24f4a
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
@@@ -1,1009 -1,0 +1,1009 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.client.impl;
 +
 +import java.io.IOException;
 +import java.lang.management.CompilationMXBean;
 +import java.lang.management.GarbageCollectorMXBean;
 +import java.lang.management.ManagementFactory;
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Set;
 +import java.util.Timer;
 +import java.util.TimerTask;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicLong;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.AccumuloException;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.BatchWriterConfig;
 +import org.apache.accumulo.core.client.Instance;
 +import org.apache.accumulo.core.client.MutationsRejectedException;
 +import org.apache.accumulo.core.client.TableDeletedException;
 +import org.apache.accumulo.core.client.TableNotFoundException;
 +import org.apache.accumulo.core.client.TableOfflineException;
 +import org.apache.accumulo.core.client.TimedOutException;
 +import org.apache.accumulo.core.client.impl.TabletLocator.TabletServerMutations;
 +import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode;
 +import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.constraints.Violations;
 +import org.apache.accumulo.core.data.ConstraintViolationSummary;
 +import org.apache.accumulo.core.data.KeyExtent;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.thrift.TMutation;
 +import org.apache.accumulo.core.data.thrift.UpdateErrors;
 +import org.apache.accumulo.core.master.state.tables.TableState;
 +import org.apache.accumulo.core.security.thrift.TCredentials;
 +import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException;
 +import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
 +import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
 +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
 +import org.apache.accumulo.core.util.SimpleThreadPool;
 +import org.apache.accumulo.core.util.ThriftUtil;
 +import org.apache.accumulo.trace.instrument.Span;
 +import org.apache.accumulo.trace.instrument.Trace;
 +import org.apache.accumulo.trace.instrument.Tracer;
 +import org.apache.accumulo.trace.thrift.TInfo;
 +import org.apache.hadoop.io.Text;
 +import org.apache.log4j.Logger;
 +import org.apache.thrift.TApplicationException;
 +import org.apache.thrift.TException;
 +import org.apache.thrift.TServiceClient;
 +import org.apache.thrift.transport.TTransport;
 +import org.apache.thrift.transport.TTransportException;
 +
 +/*
 + * Differences from previous TabletServerBatchWriter
 + *   + As background threads finish sending mutations to tablet servers they decrement memory usage
 + *   + Once the queue of unprocessed mutations reaches 50% it is always pushed to the background threads, 
 + *      even if they are currently processing... new mutations are merged with mutations currently 
 + *      processing in the background
 + *   + Failed mutations are held for 1000ms and then re-added to the unprocessed queue
 + *   + Flush holds adding of new mutations so it does not wait indefinitely
 + * 
 + * Considerations
 + *   + All background threads must catch and note Throwable
 + *   + mutations for a single tablet server are only processed by one thread concurrently (if new mutations 
 + *      come in for a tablet server while one thread is processing mutations for it, no other thread should 
 + *      start processing those mutations)
 + *   
 + * Memory accounting
 + *   + when a mutation enters the system memory is incremented
 + *   + when a mutation successfully leaves the system memory is decremented
 + * 
 + * 
 + * 
 + */
 +
 +public class TabletServerBatchWriter {
 +  
 +  private static final Logger log = Logger.getLogger(TabletServerBatchWriter.class);
 +  
 +  private long totalMemUsed = 0;
 +  private long maxMem;
 +  private MutationSet mutations;
 +  private boolean flushing;
 +  private boolean closed;
 +  private MutationWriter writer;
 +  private FailedMutations failedMutations;
 +  
 +  private Instance instance;
 +  private TCredentials credentials;
 +  
 +  private Violations violations;
 +  private Map<KeyExtent,Set<SecurityErrorCode>> authorizationFailures;
 +  private HashSet<String> serverSideErrors;
 +  private int unknownErrors = 0;
 +  private boolean somethingFailed = false;
 +  
 +  private Timer jtimer;
 +  
 +  private long maxLatency;
 +  
 +  private long timeout;
 +  
 +  private long lastProcessingStartTime;
 +  
 +  private long totalAdded = 0;
 +  private AtomicLong totalSent = new AtomicLong(0);
 +  private AtomicLong totalBinned = new AtomicLong(0);
 +  private AtomicLong totalBinTime = new AtomicLong(0);
 +  private AtomicLong totalSendTime = new AtomicLong(0);
 +  private long startTime = 0;
 +  private long initialGCTimes;
 +  private long initialCompileTimes;
 +  private double initialSystemLoad;
 +  
 +  private int tabletServersBatchSum = 0;
 +  private int tabletBatchSum = 0;
 +  private int numBatches = 0;
 +  private int maxTabletBatch = Integer.MIN_VALUE;
 +  private int minTabletBatch = Integer.MAX_VALUE;
 +  private int minTabletServersBatch = Integer.MAX_VALUE;
 +  private int maxTabletServersBatch = Integer.MIN_VALUE;
 +  
 +  private Throwable lastUnknownError = null;
 +  
 +  private Map<String,TimeoutTracker> timeoutTrackers;
 +  
 +  private static class TimeoutTracker {
 +    
 +    String server;
 +    long timeOut;
 +    long activityTime;
 +    Long firstErrorTime = null;
 +    
 +    TimeoutTracker(String server, long timeOut) {
 +      this.timeOut = timeOut;
 +      this.server = server;
 +    }
 +    
 +    void startingWrite() {
 +      activityTime = System.currentTimeMillis();
 +    }
 +    
 +    void madeProgress() {
 +      activityTime = System.currentTimeMillis();
 +      firstErrorTime = null;
 +    }
 +    
 +    void wroteNothing() {
 +      if (firstErrorTime == null) {
 +        firstErrorTime = activityTime;
 +      } else if (System.currentTimeMillis() - firstErrorTime > timeOut) {
 +        throw new TimedOutException(Collections.singleton(server));
 +      }
 +    }
 +    
 +    void errorOccured(Exception e) {
 +      wroteNothing();
 +    }
 +    
 +    public long getTimeOut() {
 +      return timeOut;
 +    }
 +  }
 +  
 +  public TabletServerBatchWriter(Instance instance, TCredentials credentials, BatchWriterConfig config) {
 +    this.instance = instance;
 +    this.maxMem = config.getMaxMemory();
 +    this.maxLatency = config.getMaxLatency(TimeUnit.MILLISECONDS) <= 0 ? Long.MAX_VALUE : config.getMaxLatency(TimeUnit.MILLISECONDS);
 +    this.credentials = credentials;
 +    this.timeout = config.getTimeout(TimeUnit.MILLISECONDS);
 +    mutations = new MutationSet();
 +    
 +    violations = new Violations();
 +    
 +    authorizationFailures = new HashMap<KeyExtent,Set<SecurityErrorCode>>();
 +    serverSideErrors = new HashSet<String>();
 +    
 +    lastProcessingStartTime = System.currentTimeMillis();
 +    
 +    jtimer = new Timer("BatchWriterLatencyTimer", true);
 +    
 +    writer = new MutationWriter(config.getMaxWriteThreads());
 +    failedMutations = new FailedMutations();
 +    
 +    timeoutTrackers = Collections.synchronizedMap(new HashMap<String,TabletServerBatchWriter.TimeoutTracker>());
 +    
 +    if (this.maxLatency != Long.MAX_VALUE) {
 +      jtimer.schedule(new TimerTask() {
 +        public void run() {
 +          try {
 +            synchronized (TabletServerBatchWriter.this) {
 +              if ((System.currentTimeMillis() - lastProcessingStartTime) > TabletServerBatchWriter.this.maxLatency)
 +                startProcessing();
 +            }
 +          } catch (Throwable t) {
 +            updateUnknownErrors("Max latency task failed " + t.getMessage(), t);
 +          }
 +        }
 +      }, 0, this.maxLatency / 4);
 +    }
 +  }
 +  
 +  private synchronized void startProcessing() {
 +    if (mutations.getMemoryUsed() == 0)
 +      return;
 +    lastProcessingStartTime = System.currentTimeMillis();
 +    writer.addMutations(mutations);
 +    mutations = new MutationSet();
 +  }
 +  
 +  private synchronized void decrementMemUsed(long amount) {
 +    totalMemUsed -= amount;
 +    this.notifyAll();
 +  }
 +  
 +  public synchronized void addMutation(String table, Mutation m) throws MutationsRejectedException {
 +    
 +    if (closed)
 +      throw new IllegalStateException("Closed");
 +    if (m.size() == 0)
 +      throw new IllegalArgumentException("Can not add empty mutations");
 +    
 +    checkForFailures();
 +    
 +    while ((totalMemUsed > maxMem || flushing) && !somethingFailed) {
 +      waitRTE();
 +    }
 +    
 +    // do checks again since things could have changed while waiting and not holding lock
 +    if (closed)
 +      throw new IllegalStateException("Closed");
 +    checkForFailures();
 +    
 +    if (startTime == 0) {
 +      startTime = System.currentTimeMillis();
 +      
 +      List<GarbageCollectorMXBean> gcmBeans = ManagementFactory.getGarbageCollectorMXBeans();
 +      for (GarbageCollectorMXBean garbageCollectorMXBean : gcmBeans) {
 +        initialGCTimes += garbageCollectorMXBean.getCollectionTime();
 +      }
 +      
 +      CompilationMXBean compMxBean = ManagementFactory.getCompilationMXBean();
 +      if (compMxBean.isCompilationTimeMonitoringSupported()) {
 +        initialCompileTimes = compMxBean.getTotalCompilationTime();
 +      }
 +      
 +      initialSystemLoad = ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage();
 +    }
 +    
 +    // create a copy of mutation so that after this method returns the user
 +    // is free to reuse the mutation object, like calling readFields... this
 +    // is important for the case where a mutation is passed from map to reduce
 +    // to batch writer... the map reduce code will keep passing the same mutation
 +    // object into the reduce method
 +    m = new Mutation(m);
 +    
 +    totalMemUsed += m.estimatedMemoryUsed();
 +    mutations.addMutation(table, m);
 +    totalAdded++;
 +    
 +    if (mutations.getMemoryUsed() >= maxMem / 2) {
 +      startProcessing();
 +      checkForFailures();
 +    }
 +  }
 +  
 +  public void addMutation(String table, Iterator<Mutation> iterator) throws MutationsRejectedException {
 +    while (iterator.hasNext()) {
 +      addMutation(table, iterator.next());
 +    }
 +  }
 +  
 +  public synchronized void flush() throws MutationsRejectedException {
 +    
 +    if (closed)
 +      throw new IllegalStateException("Closed");
 +    
 +    Span span = Trace.start("flush");
 +    
 +    try {
 +      checkForFailures();
 +      
 +      if (flushing) {
 +        // some other thread is currently flushing, so wait
 +        while (flushing && !somethingFailed)
 +          waitRTE();
 +        
 +        checkForFailures();
 +        
 +        return;
 +      }
 +      
 +      flushing = true;
 +      
 +      startProcessing();
 +      checkForFailures();
 +      
 +      while (totalMemUsed > 0 && !somethingFailed) {
 +        waitRTE();
 +      }
 +      
 +      flushing = false;
 +      this.notifyAll();
 +      
 +      checkForFailures();
 +    } finally {
 +      span.stop();
 +    }
 +  }
 +  
 +  public synchronized void close() throws MutationsRejectedException {
 +    
 +    if (closed)
 +      return;
 +    
 +    Span span = Trace.start("close");
 +    try {
 +      closed = true;
 +      
 +      startProcessing();
 +      
 +      while (totalMemUsed > 0 && !somethingFailed) {
 +        waitRTE();
 +      }
 +      
 +      logStats();
 +      
 +      checkForFailures();
 +    } finally {
 +      // make a best effort to release these resources
 +      writer.sendThreadPool.shutdownNow();
 +      jtimer.cancel();
 +      span.stop();
 +    }
 +  }
 +  
 +  private void logStats() {
 +    long finishTime = System.currentTimeMillis();
 +    
 +    long finalGCTimes = 0;
 +    List<GarbageCollectorMXBean> gcmBeans = ManagementFactory.getGarbageCollectorMXBeans();
 +    for (GarbageCollectorMXBean garbageCollectorMXBean : gcmBeans) {
 +      finalGCTimes += garbageCollectorMXBean.getCollectionTime();
 +    }
 +    
 +    CompilationMXBean compMxBean = ManagementFactory.getCompilationMXBean();
 +    long finalCompileTimes = 0;
 +    if (compMxBean.isCompilationTimeMonitoringSupported()) {
 +      finalCompileTimes = compMxBean.getTotalCompilationTime();
 +    }
 +    
 +    double averageRate = totalSent.get() / (totalSendTime.get() / 1000.0);
 +    double overallRate = totalAdded / ((finishTime - startTime) / 1000.0);
 +    
 +    double finalSystemLoad = ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage();
 +    
 +    if (log.isTraceEnabled()) {
 +      log.trace("");
 +      log.trace("TABLET SERVER BATCH WRITER STATISTICS");
 +      log.trace(String.format("Added                : %,10d mutations", totalAdded));
 +      log.trace(String.format("Sent                 : %,10d mutations", totalSent.get()));
 +      log.trace(String.format("Resent percentage   : %10.2f%s", (totalSent.get() - totalAdded) / (double) totalAdded * 100.0, "%"));
 +      log.trace(String.format("Overall time         : %,10.2f secs", (finishTime - startTime) / 1000.0));
 +      log.trace(String.format("Overall send rate    : %,10.2f mutations/sec", overallRate));
 +      log.trace(String.format("Send efficiency      : %10.2f%s", overallRate / averageRate * 100.0, "%"));
 +      log.trace("");
 +      log.trace("BACKGROUND WRITER PROCESS STATISTICS");
 +      log.trace(String.format("Total send time      : %,10.2f secs %6.2f%s", totalSendTime.get() / 1000.0, 100.0 * totalSendTime.get()
 +          / (finishTime - startTime), "%"));
 +      log.trace(String.format("Average send rate    : %,10.2f mutations/sec", averageRate));
 +      log.trace(String.format("Total bin time       : %,10.2f secs %6.2f%s", totalBinTime.get() / 1000.0,
 +          100.0 * totalBinTime.get() / (finishTime - startTime), "%"));
 +      log.trace(String.format("Average bin rate     : %,10.2f mutations/sec", totalBinned.get() / (totalBinTime.get() / 1000.0)));
 +      log.trace(String.format("tservers per batch   : %,8.2f avg  %,6d min %,6d max", (tabletServersBatchSum / (double) numBatches), minTabletServersBatch,
 +          maxTabletServersBatch));
 +      log.trace(String.format("tablets per batch    : %,8.2f avg  %,6d min %,6d max", (tabletBatchSum / (double) numBatches), minTabletBatch, maxTabletBatch));
 +      log.trace("");
 +      log.trace("SYSTEM STATISTICS");
 +      log.trace(String.format("JVM GC Time          : %,10.2f secs", ((finalGCTimes - initialGCTimes) / 1000.0)));
 +      if (compMxBean.isCompilationTimeMonitoringSupported()) {
 +        log.trace(String.format("JVM Compile Time     : %,10.2f secs", ((finalCompileTimes - initialCompileTimes) / 1000.0)));
 +      }
 +      log.trace(String.format("System load average : initial=%6.2f final=%6.2f", initialSystemLoad, finalSystemLoad));
 +    }
 +  }
 +  
 +  private void updateSendStats(long count, long time) {
 +    totalSent.addAndGet(count);
 +    totalSendTime.addAndGet(time);
 +  }
 +  
 +  public void updateBinningStats(int count, long time, Map<String,TabletServerMutations> binnedMutations) {
 +    totalBinTime.addAndGet(time);
 +    totalBinned.addAndGet(count);
 +    updateBatchStats(binnedMutations);
 +  }
 +  
 +  private synchronized void updateBatchStats(Map<String,TabletServerMutations> binnedMutations) {
 +    tabletServersBatchSum += binnedMutations.size();
 +    
 +    minTabletServersBatch = Math.min(minTabletServersBatch, binnedMutations.size());
 +    maxTabletServersBatch = Math.max(maxTabletServersBatch, binnedMutations.size());
 +    
 +    int numTablets = 0;
 +    
 +    for (Entry<String,TabletServerMutations> entry : binnedMutations.entrySet()) {
 +      TabletServerMutations tsm = entry.getValue();
 +      numTablets += tsm.getMutations().size();
 +    }
 +    
 +    tabletBatchSum += numTablets;
 +    
 +    minTabletBatch = Math.min(minTabletBatch, numTablets);
 +    maxTabletBatch = Math.max(maxTabletBatch, numTablets);
 +    
 +    numBatches++;
 +  }
 +  
 +  private void waitRTE() {
 +    try {
 +      wait();
 +    } catch (InterruptedException e) {
 +      throw new RuntimeException(e);
 +    }
 +  }
 +  
 +  // BEGIN code for handling unrecoverable errors
 +  
 +  private void updatedConstraintViolations(List<ConstraintViolationSummary> cvsList) {
 +    if (cvsList.size() > 0) {
 +      synchronized (this) {
 +        somethingFailed = true;
 +        violations.add(cvsList);
 +        this.notifyAll();
 +      }
 +    }
 +  }
 +
 +  private void updateAuthorizationFailures(Set<KeyExtent> keySet, SecurityErrorCode code) {
 +    HashMap<KeyExtent, SecurityErrorCode> map = new HashMap<KeyExtent, SecurityErrorCode>();
 +    for (KeyExtent ke : keySet)
 +      map.put(ke, code);
 +    
 +    updateAuthorizationFailures(map);
 +  }
 +  
 +  private void updateAuthorizationFailures(Map<KeyExtent,SecurityErrorCode> authorizationFailures) {
 +    if (authorizationFailures.size() > 0) {
 +      
 +      // was a table deleted?
 +      HashSet<String> tableIds = new HashSet<String>();
 +      for (KeyExtent ke : authorizationFailures.keySet())
 +        tableIds.add(ke.getTableId().toString());
 +      
 +      Tables.clearCache(instance);
 +      for (String tableId : tableIds)
 +        if (!Tables.exists(instance, tableId))
 +          throw new TableDeletedException(tableId);
 +      
 +      synchronized (this) {
 +        somethingFailed = true;
 +        mergeAuthorizationFailures(this.authorizationFailures, authorizationFailures);
 +        this.notifyAll();
 +      }
 +    }
 +  }
 +  
 +  private void mergeAuthorizationFailures(Map<KeyExtent,Set<SecurityErrorCode>> source, Map<KeyExtent,SecurityErrorCode> addition) {
 +    for (Entry<KeyExtent,SecurityErrorCode> entry : addition.entrySet()) {
 +      Set<SecurityErrorCode> secs = source.get(entry.getKey());
 +      if (secs == null) {
 +        secs = new HashSet<SecurityErrorCode>();
 +        source.put(entry.getKey(), secs);
 +      }
 +      secs.add(entry.getValue());
 +    }
 +  }
 +  
 +  private synchronized void updateServerErrors(String server, Exception e) {
 +    somethingFailed = true;
 +    this.serverSideErrors.add(server);
 +    this.notifyAll();
 +    log.error("Server side error on " + server);
 +  }
 +  
 +  private synchronized void updateUnknownErrors(String msg, Throwable t) {
 +    somethingFailed = true;
 +    unknownErrors++;
 +    this.lastUnknownError = t;
 +    this.notifyAll();
 +    if (t instanceof TableDeletedException || t instanceof TableOfflineException || t instanceof TimedOutException)
 +      log.debug(msg, t); // this is not unknown
 +    else
 +      log.error(msg, t);
 +  }
 +  
 +  private void checkForFailures() throws MutationsRejectedException {
 +    if (somethingFailed) {
 +      List<ConstraintViolationSummary> cvsList = violations.asList();
 +      HashMap<KeyExtent,Set<org.apache.accumulo.core.client.security.SecurityErrorCode>> af = new HashMap<KeyExtent,Set<org.apache.accumulo.core.client.security.SecurityErrorCode>>();
 +      for (Entry<KeyExtent,Set<SecurityErrorCode>> entry : authorizationFailures.entrySet()) {
 +        HashSet<org.apache.accumulo.core.client.security.SecurityErrorCode> codes = new HashSet<org.apache.accumulo.core.client.security.SecurityErrorCode>();
 +        
 +        for (SecurityErrorCode sce : entry.getValue()) {
 +          codes.add(org.apache.accumulo.core.client.security.SecurityErrorCode.valueOf(sce.name()));
 +        }
 +        
 +        af.put(entry.getKey(), codes);
 +      }
 +      
 +      throw new MutationsRejectedException(cvsList, af, serverSideErrors, unknownErrors, lastUnknownError);
 +    }
 +  }
 +  
 +  // END code for handling unrecoverable errors
 +  
 +  // BEGIN code for handling failed mutations
 +  
 +  /**
 +   * Add mutations that previously failed back into the mix
 +   * 
 +   * @param mutationsprivate
 +   *          static final Logger log = Logger.getLogger(TabletServerBatchWriter.class);
 +   */
 +  private synchronized void addFailedMutations(MutationSet failedMutations) throws Exception {
 +    mutations.addAll(failedMutations);
 +    if (mutations.getMemoryUsed() >= maxMem / 2 || closed || flushing) {
 +      startProcessing();
 +    }
 +  }
 +  
 +  private class FailedMutations extends TimerTask {
 +    
 +    private MutationSet recentFailures = null;
 +    private long initTime;
 +    
 +    FailedMutations() {
 +      jtimer.schedule(this, 0, 500);
 +    }
 +    
 +    private MutationSet init() {
 +      if (recentFailures == null) {
 +        recentFailures = new MutationSet();
 +        initTime = System.currentTimeMillis();
 +      }
 +      return recentFailures;
 +    }
 +    
 +    synchronized void add(String table, ArrayList<Mutation> tableFailures) {
 +      init().addAll(table, tableFailures);
 +    }
 +    
 +    synchronized void add(MutationSet failures) {
 +      init().addAll(failures);
 +    }
 +    
 +    synchronized void add(String location, TabletServerMutations tsm) {
 +      init();
 +      for (Entry<KeyExtent,List<Mutation>> entry : tsm.getMutations().entrySet()) {
 +        recentFailures.addAll(entry.getKey().getTableId().toString(), entry.getValue());
 +      }
 +      
 +    }
 +    
 +    @Override
 +    public void run() {
 +      try {
 +        MutationSet rf = null;
 +        
 +        synchronized (this) {
 +          if (recentFailures != null && System.currentTimeMillis() - initTime > 1000) {
 +            rf = recentFailures;
 +            recentFailures = null;
 +          }
 +        }
 +        
 +        if (rf != null) {
 +          if (log.isTraceEnabled())
 +            log.trace("requeuing " + rf.size() + " failed mutations");
 +          addFailedMutations(rf);
 +        }
 +      } catch (Throwable t) {
 +        updateUnknownErrors("Failed to requeue failed mutations " + t.getMessage(), t);
 +        cancel();
 +      }
 +    }
 +  }
 +  
 +  // END code for handling failed mutations
 +  
 +  // BEGIN code for sending mutations to tablet servers using background threads
 +  
 +  private class MutationWriter {
 +    
 +    private static final int MUTATION_BATCH_SIZE = 1 << 17;
 +    private ExecutorService sendThreadPool;
 +    private Map<String,TabletServerMutations> serversMutations;
 +    private Set<String> queued;
 +    private Map<String,TabletLocator> locators;
 +    
 +    public MutationWriter(int numSendThreads) {
 +      serversMutations = new HashMap<String,TabletServerMutations>();
 +      queued = new HashSet<String>();
 +      sendThreadPool = new SimpleThreadPool(numSendThreads, this.getClass().getName());
 +      locators = new HashMap<String,TabletLocator>();
 +    }
 +    
 +    private TabletLocator getLocator(String tableId) {
 +      TabletLocator ret = locators.get(tableId);
 +      if (ret == null) {
 +        ret = TabletLocator.getInstance(instance, new Text(tableId));
 +        ret = new TimeoutTabletLocator(ret, timeout);
 +        locators.put(tableId, ret);
 +      }
 +      
 +      return ret;
 +    }
 +    
 +    private void binMutations(MutationSet mutationsToProcess, Map<String,TabletServerMutations> binnedMutations) {
 +      try {
 +        Set<Entry<String,List<Mutation>>> es = mutationsToProcess.getMutations().entrySet();
 +        for (Entry<String,List<Mutation>> entry : es) {
 +          TabletLocator locator = getLocator(entry.getKey());
 +          
 +          String table = entry.getKey();
 +          List<Mutation> tableMutations = entry.getValue();
 +          
 +          if (tableMutations != null) {
 +            ArrayList<Mutation> tableFailures = new ArrayList<Mutation>();
 +            locator.binMutations(tableMutations, binnedMutations, tableFailures, credentials);
 +            
 +            if (tableFailures.size() > 0) {
 +              failedMutations.add(table, tableFailures);
 +              
 +              if (tableFailures.size() == tableMutations.size())
 +                if (!Tables.exists(instance, entry.getKey()))
 +                  throw new TableDeletedException(entry.getKey());
 +                else if (Tables.getTableState(instance, table) == TableState.OFFLINE)
 +                  throw new TableOfflineException(instance, entry.getKey());
 +            }
 +          }
 +          
 +        }
 +        return;
 +      } catch (AccumuloServerException ase) {
 +        updateServerErrors(ase.getServer(), ase);
 +      } catch (AccumuloException ae) {
 +        // assume an IOError communicating with !METADATA tablet
 +        failedMutations.add(mutationsToProcess);
 +      } catch (AccumuloSecurityException e) {
 +        updateAuthorizationFailures(Collections.singletonMap(new KeyExtent(new Text(Constants.METADATA_TABLE_ID), null, null),
 +            SecurityErrorCode.valueOf(e.getSecurityErrorCode().name())));
 +      } catch (TableDeletedException e) {
 +        updateUnknownErrors(e.getMessage(), e);
 +      } catch (TableOfflineException e) {
 +        updateUnknownErrors(e.getMessage(), e);
 +      } catch (TableNotFoundException e) {
 +        updateUnknownErrors(e.getMessage(), e);
 +      }
 +      
 +      // an error ocurred
 +      binnedMutations.clear();
 +      
 +    }
 +    
 +    void addMutations(MutationSet mutationsToSend) {
 +      Map<String,TabletServerMutations> binnedMutations = new HashMap<String,TabletServerMutations>();
 +      Span span = Trace.start("binMutations");
 +      try {
 +        long t1 = System.currentTimeMillis();
 +        binMutations(mutationsToSend, binnedMutations);
 +        long t2 = System.currentTimeMillis();
 +        updateBinningStats(mutationsToSend.size(), (t2 - t1), binnedMutations);
 +      } finally {
 +        span.stop();
 +      }
 +      addMutations(binnedMutations);
 +    }
 +    
 +    private synchronized void addMutations(Map<String,TabletServerMutations> binnedMutations) {
 +      
 +      int count = 0;
 +      
 +      // merge mutations into existing mutations for a tablet server
 +      for (Entry<String,TabletServerMutations> entry : binnedMutations.entrySet()) {
 +        String server = entry.getKey();
 +        
 +        TabletServerMutations currentMutations = serversMutations.get(server);
 +        
 +        if (currentMutations == null) {
 +          serversMutations.put(server, entry.getValue());
 +        } else {
 +          for (Entry<KeyExtent,List<Mutation>> entry2 : entry.getValue().getMutations().entrySet()) {
 +            for (Mutation m : entry2.getValue()) {
 +              currentMutations.addMutation(entry2.getKey(), m);
 +            }
 +          }
 +        }
 +        
 +        if (log.isTraceEnabled())
 +          for (Entry<KeyExtent,List<Mutation>> entry2 : entry.getValue().getMutations().entrySet())
 +            count += entry2.getValue().size();
 +        
 +      }
 +      
 +      if (count > 0 && log.isTraceEnabled())
 +        log.trace(String.format("Started sending %,d mutations to %,d tablet servers", count, binnedMutations.keySet().size()));
 +      
 +      // randomize order of servers
 +      ArrayList<String> servers = new ArrayList<String>(binnedMutations.keySet());
 +      Collections.shuffle(servers);
 +      
 +      for (String server : servers)
 +        if (!queued.contains(server)) {
 +          sendThreadPool.submit(Trace.wrap(new SendTask(server)));
 +          queued.add(server);
 +        }
 +    }
 +    
 +    private synchronized TabletServerMutations getMutationsToSend(String server) {
 +      TabletServerMutations tsmuts = serversMutations.remove(server);
 +      if (tsmuts == null)
 +        queued.remove(server);
 +      
 +      return tsmuts;
 +    }
 +    
 +    class SendTask implements Runnable {
 +      
 +      private String location;
 +      
 +      SendTask(String server) {
 +        this.location = server;
 +      }
 +      
 +      @Override
 +      public void run() {
 +        try {
 +          TabletServerMutations tsmuts = getMutationsToSend(location);
 +          
 +          while (tsmuts != null) {
 +            send(tsmuts);
 +            tsmuts = getMutationsToSend(location);
 +          }
 +          
 +          return;
 +        } catch (Throwable t) {
 +          updateUnknownErrors("Failed to send tablet server " + location + " its batch : " + t.getMessage(), t);
 +        }
 +      }
 +      
 +      public void send(TabletServerMutations tsm) throws AccumuloServerException, AccumuloSecurityException {
 +        
 +        MutationSet failures = null;
 +        
 +        String oldName = Thread.currentThread().getName();
 +        
 +        Map<KeyExtent,List<Mutation>> mutationBatch = tsm.getMutations();
 +        try {
 +          
 +          long count = 0;
 +          for (List<Mutation> list : mutationBatch.values()) {
 +            count += list.size();
 +          }
 +          String msg = "sending " + String.format("%,d", count) + " mutations to " + String.format("%,d", mutationBatch.size()) + " tablets at " + location;
 +          Thread.currentThread().setName(msg);
 +          
 +          Span span = Trace.start("sendMutations");
 +          try {
 +            
 +            TimeoutTracker timeoutTracker = timeoutTrackers.get(location);
 +            if (timeoutTracker == null) {
 +              timeoutTracker = new TimeoutTracker(location, timeout);
 +              timeoutTrackers.put(location, timeoutTracker);
 +            }
 +            
 +            long st1 = System.currentTimeMillis();
 +            failures = sendMutationsToTabletServer(location, mutationBatch, timeoutTracker);
 +            long st2 = System.currentTimeMillis();
 +            if (log.isTraceEnabled())
 +              log.trace("sent " + String.format("%,d", count) + " mutations to " + location + " in "
 +                  + String.format("%.2f secs (%,.2f mutations/sec) with %,d failures", (st2 - st1) / 1000.0, count / ((st2 - st1) / 1000.0), failures.size()));
 +            
 +            long successBytes = 0;
 +            for (Entry<KeyExtent,List<Mutation>> entry : mutationBatch.entrySet()) {
 +              for (Mutation mutation : entry.getValue()) {
 +                successBytes += mutation.estimatedMemoryUsed();
 +              }
 +            }
 +            
 +            if (failures.size() > 0) {
 +              failedMutations.add(failures);
 +              successBytes -= failures.getMemoryUsed();
 +            }
 +            
 +            updateSendStats(count, st2 - st1);
 +            decrementMemUsed(successBytes);
 +            
 +          } finally {
 +            span.stop();
 +          }
 +        } catch (IOException e) {
 +          if (log.isTraceEnabled())
 +            log.trace("failed to send mutations to " + location + " : " + e.getMessage());
 +          
 +          HashSet<String> tables = new HashSet<String>();
 +          for (KeyExtent ke : mutationBatch.keySet())
 +            tables.add(ke.getTableId().toString());
 +          
 +          for (String table : tables)
 +            TabletLocator.getInstance(instance, new Text(table)).invalidateCache(location);
 +          
 +          failedMutations.add(location, tsm);
 +        } finally {
 +          Thread.currentThread().setName(oldName);
 +        }
 +      }
 +    }
 +    
 +    private MutationSet sendMutationsToTabletServer(String location, Map<KeyExtent,List<Mutation>> tabMuts, TimeoutTracker timeoutTracker) throws IOException,
 +        AccumuloSecurityException, AccumuloServerException {
 +      if (tabMuts.size() == 0) {
 +        return new MutationSet();
 +      }
 +      TInfo tinfo = Tracer.traceInfo();
 +      TTransport transport = null;
 +      
 +      timeoutTracker.startingWrite();
 +      
 +      try {
 +        TabletClientService.Iface client;
 +        
 +        if (timeoutTracker.getTimeOut() < instance.getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT))
 +          client = ThriftUtil.getTServerClient(location, instance.getConfiguration(), timeoutTracker.getTimeOut());
 +        else
 +          client = ThriftUtil.getTServerClient(location, instance.getConfiguration());
 +        
 +        try {
 +          MutationSet allFailures = new MutationSet();
 +          
 +          if (tabMuts.size() == 1 && tabMuts.values().iterator().next().size() == 1) {
 +            Entry<KeyExtent,List<Mutation>> entry = tabMuts.entrySet().iterator().next();
 +            
 +            try {
 +              client.update(tinfo, credentials, entry.getKey().toThrift(), entry.getValue().get(0).toThrift());
 +            } catch (NotServingTabletException e) {
 +              allFailures.addAll(entry.getKey().getTableId().toString(), entry.getValue());
 +              TabletLocator.getInstance(instance, new Text(entry.getKey().getTableId())).invalidateCache(entry.getKey());
 +            } catch (ConstraintViolationException e) {
-               updatedConstraintViolations(Translator.translate(e.violationSummaries, Translator.TCVST));
++              updatedConstraintViolations(Translator.translate(e.violationSummaries, Translators.TCVST));
 +            }
 +            timeoutTracker.madeProgress();
 +          } else {
 +            
 +            long usid = client.startUpdate(tinfo, credentials);
 +            
 +            List<TMutation> updates = new ArrayList<TMutation>();
 +            for (Entry<KeyExtent,List<Mutation>> entry : tabMuts.entrySet()) {
 +              long size = 0;
 +              Iterator<Mutation> iter = entry.getValue().iterator();
 +              while (iter.hasNext()) {
 +                while (size < MUTATION_BATCH_SIZE && iter.hasNext()) {
 +                  Mutation mutation = iter.next();
 +                  updates.add(mutation.toThrift());
 +                  size += mutation.numBytes();
 +                }
 +                
 +                client.applyUpdates(tinfo, usid, entry.getKey().toThrift(), updates);
 +                updates.clear();
 +                size = 0;
 +              }
 +            }
 +            
 +            UpdateErrors updateErrors = client.closeUpdate(tinfo, usid);
 +            
-             Map<KeyExtent,Long> failures = Translator.translate(updateErrors.failedExtents, Translator.TKET);
-             updatedConstraintViolations(Translator.translate(updateErrors.violationSummaries, Translator.TCVST));
-             updateAuthorizationFailures(Translator.translate(updateErrors.authorizationFailures, Translator.TKET));
++            Map<KeyExtent,Long> failures = Translator.translate(updateErrors.failedExtents, Translators.TKET);
++            updatedConstraintViolations(Translator.translate(updateErrors.violationSummaries, Translators.TCVST));
++            updateAuthorizationFailures(Translator.translate(updateErrors.authorizationFailures, Translators.TKET));
 +            
 +            long totalCommitted = 0;
 +            
 +            for (Entry<KeyExtent,Long> entry : failures.entrySet()) {
 +              KeyExtent failedExtent = entry.getKey();
 +              int numCommitted = (int) (long) entry.getValue();
 +              totalCommitted += numCommitted;
 +              
 +              String table = failedExtent.getTableId().toString();
 +              
 +              TabletLocator.getInstance(instance, new Text(table)).invalidateCache(failedExtent);
 +              
 +              ArrayList<Mutation> mutations = (ArrayList<Mutation>) tabMuts.get(failedExtent);
 +              allFailures.addAll(table, mutations.subList(numCommitted, mutations.size()));
 +            }
 +            
 +            if (failures.keySet().containsAll(tabMuts.keySet()) && totalCommitted == 0) {
 +              // nothing was successfully written
 +              timeoutTracker.wroteNothing();
 +            } else {
 +              // successfully wrote something to tablet server
 +              timeoutTracker.madeProgress();
 +            }
 +          }
 +          return allFailures;
 +        } finally {
 +          ThriftUtil.returnClient((TServiceClient) client);
 +        }
 +      } catch (TTransportException e) {
 +        timeoutTracker.errorOccured(e);
 +        throw new IOException(e);
 +      } catch (TApplicationException tae) {
 +        updateServerErrors(location, tae);
 +        throw new AccumuloServerException(location, tae);
 +      } catch (ThriftSecurityException e) {
 +        updateAuthorizationFailures(tabMuts.keySet(), e.code);
 +        throw new AccumuloSecurityException(e.user, e.code, e);
 +      } catch (NoSuchScanIDException e) {
 +        throw new IOException(e);
 +      } catch (TException e) {
 +        throw new IOException(e);
 +      } finally {
 +        ThriftTransportPool.getInstance().returnTransport(transport);
 +      }
 +    }
 +  }
 +  
 +  // END code for sending mutations to tablet servers using background threads
 +  
 +  private static class MutationSet {
 +    
 +    private HashMap<String,List<Mutation>> mutations;
 +    private int memoryUsed = 0;
 +    
 +    MutationSet() {
 +      mutations = new HashMap<String,List<Mutation>>();
 +    }
 +    
 +    void addMutation(String table, Mutation mutation) {
 +      List<Mutation> tabMutList = mutations.get(table);
 +      if (tabMutList == null) {
 +        tabMutList = new ArrayList<Mutation>();
 +        mutations.put(table, tabMutList);
 +      }
 +      
 +      tabMutList.add(mutation);
 +      
 +      memoryUsed += mutation.estimatedMemoryUsed();
 +    }
 +    
 +    Map<String,List<Mutation>> getMutations() {
 +      return mutations;
 +    }
 +    
 +    int size() {
 +      int result = 0;
 +      for (List<Mutation> perTable : mutations.values()) {
 +        result += perTable.size();
 +      }
 +      return result;
 +    }
 +    
 +    public void addAll(MutationSet failures) {
 +      Set<Entry<String,List<Mutation>>> es = failures.getMutations().entrySet();
 +      
 +      for (Entry<String,List<Mutation>> entry : es) {
 +        String table = entry.getKey();
 +        
 +        for (Mutation mutation : entry.getValue()) {
 +          addMutation(table, mutation);
 +        }
 +      }
 +    }
 +    
 +    public void addAll(String table, List<Mutation> mutations) {
 +      for (Mutation mutation : mutations) {
 +        addMutation(table, mutation);
 +      }
 +    }
 +    
 +    public int getMemoryUsed() {
 +      return memoryUsed;
 +    }
 +    
 +  }
 +}


Mime
View raw message