accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctubb...@apache.org
Subject [40/61] [abbrv] [partial] accumulo git commit: ACCUMULO-722 put trunk in my sandbox
Date Thu, 03 Mar 2016 22:00:05 GMT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java b/1.5/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
new file mode 100644
index 0000000..bf74994
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
@@ -0,0 +1,890 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.impl;
+
+import java.io.IOException;
+import java.lang.management.CompilationMXBean;
+import java.lang.management.GarbageCollectorMXBean;
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.accumulo.cloudtrace.instrument.Span;
+import org.apache.accumulo.cloudtrace.instrument.Trace;
+import org.apache.accumulo.cloudtrace.instrument.Tracer;
+import org.apache.accumulo.cloudtrace.thrift.TInfo;
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.TableDeletedException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.TableOfflineException;
+import org.apache.accumulo.core.client.impl.TabletLocator.TabletServerMutations;
+import org.apache.accumulo.core.constraints.Violations;
+import org.apache.accumulo.core.data.ConstraintViolationSummary;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.thrift.TMutation;
+import org.apache.accumulo.core.data.thrift.UpdateErrors;
+import org.apache.accumulo.core.master.state.tables.TableState;
+import org.apache.accumulo.core.security.thrift.AuthInfo;
+import org.apache.accumulo.core.security.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException;
+import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
+import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
+import org.apache.accumulo.core.util.NamingThreadFactory;
+import org.apache.accumulo.core.util.ThriftUtil;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TApplicationException;
+import org.apache.thrift.TException;
+import org.apache.thrift.TServiceClient;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+
+/*
+ * Differences from previous TabletServerBatchWriter
+ *   + As background threads finish sending mutations to tablet servers they decrement memory usage
+ *   + Once the queue of unprocessed mutations reaches 50% it is always pushed to the background threads, 
+ *      even if they are currently processing... new mutations are merged with mutations currently 
+ *      processing in the background
+ *   + Failed mutations are held for 1000ms and then re-added to the unprocessed queue
+ *   + Flush holds adding of new mutations so it does not wait indefinitely
+ * 
+ * Considerations
+ *   + All background threads must catch and note Throwable
+ *   + mutations for a single tablet server are only processed by one thread concurrently (if new mutations 
+ *      come in for a tablet server while one thread is processing mutations for it, no other thread should 
+ *      start processing those mutations)
+ *   
+ * Memory accounting
+ *   + when a mutation enters the system memory is incremented
+ *   + when a mutation successfully leaves the system memory is decremented
+ * 
+ * 
+ * 
+ */
+
+public class TabletServerBatchWriter {
+  
+  private static final Logger log = Logger.getLogger(TabletServerBatchWriter.class);
+  
+  private long totalMemUsed = 0;
+  private long maxMem;
+  private MutationSet mutations;
+  private boolean flushing;
+  private boolean closed;
+  private MutationWriter writer;
+  private FailedMutations failedMutations;
+  
+  private Instance instance;
+  private AuthInfo credentials;
+  
+  private Violations violations;
+  private HashSet<KeyExtent> authorizationFailures;
+  private HashSet<String> serverSideErrors;
+  private int unknownErrors = 0;
+  private boolean somethingFailed = false;
+  
+  private Timer jtimer;
+  
+  private long maxLatency;
+  
+  private long lastProcessingStartTime;
+  
+  private long totalAdded = 0;
+  private AtomicLong totalSent = new AtomicLong(0);
+  private AtomicLong totalBinned = new AtomicLong(0);
+  private AtomicLong totalBinTime = new AtomicLong(0);
+  private AtomicLong totalSendTime = new AtomicLong(0);
+  private long startTime = 0;
+  private long initialGCTimes;
+  private long initialCompileTimes;
+  private double initialSystemLoad;
+  
+  private int tabletServersBatchSum = 0;
+  private int tabletBatchSum = 0;
+  private int numBatches = 0;
+  private int maxTabletBatch = Integer.MIN_VALUE;
+  private int minTabletBatch = Integer.MAX_VALUE;
+  private int minTabletServersBatch = Integer.MAX_VALUE;
+  private int maxTabletServersBatch = Integer.MIN_VALUE;
+  
+  private Throwable lastUnknownError = null;
+  
+  public TabletServerBatchWriter(Instance instance, AuthInfo credentials, long maxMemory, long maxLatency, int numSendThreads) {
+    this.instance = instance;
+    this.maxMem = maxMemory;
+    this.maxLatency = maxLatency <= 0 ? Long.MAX_VALUE : maxLatency;
+    this.credentials = credentials;
+    mutations = new MutationSet();
+    
+    violations = new Violations();
+    
+    authorizationFailures = new HashSet<KeyExtent>();
+    serverSideErrors = new HashSet<String>();
+    
+    lastProcessingStartTime = System.currentTimeMillis();
+    
+    jtimer = new Timer("BatchWriterLatencyTimer", true);
+    
+    writer = new MutationWriter(numSendThreads);
+    failedMutations = new FailedMutations();
+    
+    if (this.maxLatency != Long.MAX_VALUE) {
+      jtimer.schedule(new TimerTask() {
+        public void run() {
+          try {
+            synchronized (TabletServerBatchWriter.this) {
+              if ((System.currentTimeMillis() - lastProcessingStartTime) > TabletServerBatchWriter.this.maxLatency)
+                startProcessing();
+            }
+          } catch (Throwable t) {
+            updateUnknownErrors("Max latency task failed " + t.getMessage(), t);
+          }
+        }
+      }, 0, this.maxLatency / 4);
+    }
+  }
+  
+  private synchronized void startProcessing() {
+    if (mutations.getMemoryUsed() == 0)
+      return;
+    lastProcessingStartTime = System.currentTimeMillis();
+    writer.addMutations(mutations);
+    mutations = new MutationSet();
+  }
+  
+  private synchronized void decrementMemUsed(long amount) {
+    totalMemUsed -= amount;
+    this.notifyAll();
+  }
+  
+  public synchronized void addMutation(String table, Mutation m) throws MutationsRejectedException {
+    
+    if (closed)
+      throw new IllegalStateException("Closed");
+    if (m.size() == 0)
+      throw new IllegalArgumentException("Can not add empty mutations");
+    
+    checkForFailures();
+    
+    while ((totalMemUsed >= maxMem || flushing) && !somethingFailed) {
+      waitRTE();
+    }
+    
+    // do checks again since things could have changed while waiting and not holding lock
+    if (closed)
+      throw new IllegalStateException("Closed");
+    checkForFailures();
+    
+    if (startTime == 0) {
+      startTime = System.currentTimeMillis();
+      
+      List<GarbageCollectorMXBean> gcmBeans = ManagementFactory.getGarbageCollectorMXBeans();
+      for (GarbageCollectorMXBean garbageCollectorMXBean : gcmBeans) {
+        initialGCTimes += garbageCollectorMXBean.getCollectionTime();
+      }
+      
+      CompilationMXBean compMxBean = ManagementFactory.getCompilationMXBean();
+      if (compMxBean.isCompilationTimeMonitoringSupported()) {
+        initialCompileTimes = compMxBean.getTotalCompilationTime();
+      }
+      
+      initialSystemLoad = ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage();
+    }
+    
+    // create a copy of mutation so that after this method returns the user
+    // is free to reuse the mutation object, like calling readFields... this
+    // is important for the case where a mutation is passed from map to reduce
+    // to batch writer... the map reduce code will keep passing the same mutation
+    // object into the reduce method
+    m = new Mutation(m);
+    
+    totalMemUsed += m.estimatedMemoryUsed();
+    mutations.addMutation(table, m);
+    totalAdded++;
+    
+    if (mutations.getMemoryUsed() >= maxMem / 2) {
+      startProcessing();
+      checkForFailures();
+    }
+  }
+  
+  public synchronized void addMutation(String table, Iterator<Mutation> iterator) throws MutationsRejectedException {
+    while (iterator.hasNext()) {
+      addMutation(table, iterator.next());
+    }
+  }
+  
+  public synchronized void flush() throws MutationsRejectedException {
+    
+    if (closed)
+      throw new IllegalStateException("Closed");
+    
+    Span span = Trace.start("flush");
+    
+    try {
+      checkForFailures();
+      
+      if (flushing) {
+        // some other thread is currently flushing, so wait
+        while (flushing && !somethingFailed)
+          waitRTE();
+        
+        checkForFailures();
+        
+        return;
+      }
+      
+      flushing = true;
+      
+      startProcessing();
+      checkForFailures();
+      
+      while (totalMemUsed > 0 && !somethingFailed) {
+        waitRTE();
+      }
+      
+      flushing = false;
+      this.notifyAll();
+      
+      checkForFailures();
+    } finally {
+      span.stop();
+    }
+  }
+  
+  public synchronized void close() throws MutationsRejectedException {
+    
+    if (closed)
+      return;
+    
+    Span span = Trace.start("close");
+    try {
+      closed = true;
+      
+      startProcessing();
+      
+      while (totalMemUsed > 0 && !somethingFailed) {
+        waitRTE();
+      }
+      
+      logStats();
+      
+      checkForFailures();
+    } finally {
+      // make a best effort to release these resources
+      writer.sendThreadPool.shutdownNow();
+      jtimer.cancel();
+      span.stop();
+    }
+  }
+  
+  private void logStats() {
+    long finishTime = System.currentTimeMillis();
+    
+    long finalGCTimes = 0;
+    List<GarbageCollectorMXBean> gcmBeans = ManagementFactory.getGarbageCollectorMXBeans();
+    for (GarbageCollectorMXBean garbageCollectorMXBean : gcmBeans) {
+      finalGCTimes += garbageCollectorMXBean.getCollectionTime();
+    }
+    
+    CompilationMXBean compMxBean = ManagementFactory.getCompilationMXBean();
+    long finalCompileTimes = 0;
+    if (compMxBean.isCompilationTimeMonitoringSupported()) {
+      finalCompileTimes = compMxBean.getTotalCompilationTime();
+    }
+    
+    double averageRate = totalSent.get() / (totalSendTime.get() / 1000.0);
+    double overallRate = totalAdded / ((finishTime - startTime) / 1000.0);
+    
+    double finalSystemLoad = ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage();
+    
+    if (log.isTraceEnabled()) {
+      log.trace("");
+      log.trace("TABLET SERVER BATCH WRITER STATISTICS");
+      log.trace(String.format("Added                : %,10d mutations", totalAdded));
+      log.trace(String.format("Sent                 : %,10d mutations", totalSent.get()));
+      log.trace(String.format("Resent percentage   : %10.2f%s", (totalSent.get() - totalAdded) / (double) totalAdded * 100.0, "%"));
+      log.trace(String.format("Overall time         : %,10.2f secs", (finishTime - startTime) / 1000.0));
+      log.trace(String.format("Overall send rate    : %,10.2f mutations/sec", overallRate));
+      log.trace(String.format("Send efficiency      : %10.2f%s", overallRate / averageRate * 100.0, "%"));
+      log.trace("");
+      log.trace("BACKGROUND WRITER PROCESS STATISTICS");
+      log.trace(String.format("Total send time      : %,10.2f secs %6.2f%s", totalSendTime.get() / 1000.0, 100.0 * totalSendTime.get()
+          / (finishTime - startTime), "%"));
+      log.trace(String.format("Average send rate    : %,10.2f mutations/sec", averageRate));
+      log.trace(String.format("Total bin time       : %,10.2f secs %6.2f%s", totalBinTime.get() / 1000.0,
+          100.0 * totalBinTime.get() / (finishTime - startTime), "%"));
+      log.trace(String.format("Average bin rate     : %,10.2f mutations/sec", totalBinned.get() / (totalBinTime.get() / 1000.0)));
+      log.trace(String.format("tservers per batch   : %,8.2f avg  %,6d min %,6d max", (tabletServersBatchSum / (double) numBatches), minTabletServersBatch,
+          maxTabletServersBatch));
+      log.trace(String.format("tablets per batch    : %,8.2f avg  %,6d min %,6d max", (tabletBatchSum / (double) numBatches), minTabletBatch, maxTabletBatch));
+      log.trace("");
+      log.trace("SYSTEM STATISTICS");
+      log.trace(String.format("JVM GC Time          : %,10.2f secs", ((finalGCTimes - initialGCTimes) / 1000.0)));
+      if (compMxBean.isCompilationTimeMonitoringSupported()) {
+        log.trace(String.format("JVM Compile Time     : %,10.2f secs", ((finalCompileTimes - initialCompileTimes) / 1000.0)));
+      }
+      log.trace(String.format("System load average : initial=%6.2f final=%6.2f", initialSystemLoad, finalSystemLoad));
+    }
+  }
+  
+  private void updateSendStats(long count, long time) {
+    totalSent.addAndGet(count);
+    totalSendTime.addAndGet(time);
+  }
+  
+  public void updateBinningStats(int count, long time, Map<String,TabletServerMutations> binnedMutations) {
+    totalBinTime.addAndGet(time);
+    totalBinned.addAndGet(count);
+    updateBatchStats(binnedMutations);
+  }
+  
+  private synchronized void updateBatchStats(Map<String,TabletServerMutations> binnedMutations) {
+    tabletServersBatchSum += binnedMutations.size();
+    
+    minTabletServersBatch = Math.min(minTabletServersBatch, binnedMutations.size());
+    maxTabletServersBatch = Math.max(maxTabletServersBatch, binnedMutations.size());
+    
+    int numTablets = 0;
+    
+    for (Entry<String,TabletServerMutations> entry : binnedMutations.entrySet()) {
+      TabletServerMutations tsm = entry.getValue();
+      numTablets += tsm.getMutations().size();
+    }
+    
+    tabletBatchSum += numTablets;
+    
+    minTabletBatch = Math.min(minTabletBatch, numTablets);
+    maxTabletBatch = Math.max(maxTabletBatch, numTablets);
+    
+    numBatches++;
+  }
+  
+  private void waitRTE() {
+    try {
+      wait();
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+  
+  // BEGIN code for handling unrecoverable errors
+  
+  private void updatedConstraintViolations(List<ConstraintViolationSummary> cvsList) {
+    if (cvsList.size() > 0) {
+      synchronized (this) {
+        somethingFailed = true;
+        violations.add(cvsList);
+        this.notifyAll();
+      }
+    }
+  }
+  
+  private void updateAuthorizationFailures(Collection<KeyExtent> authorizationFailures) {
+    if (authorizationFailures.size() > 0) {
+      
+      // was a table deleted?
+      HashSet<String> tableIds = new HashSet<String>();
+      for (KeyExtent ke : authorizationFailures)
+        tableIds.add(ke.getTableId().toString());
+      
+      Tables.clearCache(instance);
+      for (String tableId : tableIds)
+        if (!Tables.exists(instance, tableId))
+          throw new TableDeletedException(tableId);
+      
+      synchronized (this) {
+        somethingFailed = true;
+        this.authorizationFailures.addAll(authorizationFailures);
+        this.notifyAll();
+      }
+    }
+  }
+  
+  private synchronized void updateServerErrors(String server, Exception e) {
+    somethingFailed = true;
+    this.serverSideErrors.add(server);
+    this.notifyAll();
+    log.error("Server side error on " + server);
+  }
+  
+  private synchronized void updateUnknownErrors(String msg, Throwable t) {
+    somethingFailed = true;
+    unknownErrors++;
+    this.lastUnknownError = t;
+    this.notifyAll();
+    if (t instanceof TableDeletedException || t instanceof TableOfflineException)
+      log.debug(msg, t); // this is not unknown
+    else
+      log.error(msg, t);
+  }
+  
+  private void checkForFailures() throws MutationsRejectedException {
+    if (somethingFailed) {
+      List<ConstraintViolationSummary> cvsList = violations.asList();
+      throw new MutationsRejectedException(cvsList, new ArrayList<KeyExtent>(authorizationFailures), serverSideErrors, unknownErrors, lastUnknownError);
+    }
+  }
+  
+  // END code for handling unrecoverable errors
+  
+  // BEGIN code for handling failed mutations
+  
+  /**
+   * Add mutations that previously failed back into the mix
+   * 
+   * @param mutationsprivate
+   *          static final Logger log = Logger.getLogger(TabletServerBatchWriter.class);
+   */
+  private synchronized void addFailedMutations(MutationSet failedMutations) throws Exception {
+    mutations.addAll(failedMutations);
+    if (mutations.getMemoryUsed() >= maxMem / 2 || closed || flushing) {
+      startProcessing();
+    }
+  }
+  
+  private class FailedMutations extends TimerTask {
+    
+    private MutationSet recentFailures = null;
+    private long initTime;
+    
+    FailedMutations() {
+      jtimer.schedule(this, 0, 500);
+    }
+    
+    private MutationSet init() {
+      if (recentFailures == null) {
+        recentFailures = new MutationSet();
+        initTime = System.currentTimeMillis();
+      }
+      return recentFailures;
+    }
+    
+    synchronized void add(String table, ArrayList<Mutation> tableFailures) {
+      init().addAll(table, tableFailures);
+    }
+    
+    synchronized void add(MutationSet failures) {
+      init().addAll(failures);
+    }
+    
+    synchronized void add(String location, TabletServerMutations tsm) {
+      init();
+      for (Entry<KeyExtent,List<Mutation>> entry : tsm.getMutations().entrySet()) {
+        recentFailures.addAll(entry.getKey().getTableId().toString(), entry.getValue());
+      }
+      
+    }
+    
+    @Override
+    public void run() {
+      try {
+        MutationSet rf = null;
+        
+        synchronized (this) {
+          if (recentFailures != null && System.currentTimeMillis() - initTime > 1000) {
+            rf = recentFailures;
+            recentFailures = null;
+          }
+        }
+        
+        if (rf != null) {
+          if (log.isTraceEnabled())
+            log.trace("requeuing " + rf.size() + " failed mutations");
+          addFailedMutations(rf);
+        }
+      } catch (Throwable t) {
+        updateUnknownErrors("Failed to requeue failed mutations " + t.getMessage(), t);
+        cancel();
+      }
+    }
+  }
+  
+  // END code for handling failed mutations
+  
+  // BEGIN code for sending mutations to tablet servers using background threads
+  
+  private class MutationWriter {
+    
+    private static final int MUTATION_BATCH_SIZE = 1 << 17;
+    private ExecutorService sendThreadPool;
+    private Map<String,TabletServerMutations> serversMutations;
+    private Set<String> queued;
+    
+    public MutationWriter(int numSendThreads) {
+      serversMutations = new HashMap<String,TabletServerMutations>();
+      queued = new HashSet<String>();
+      sendThreadPool = Executors.newFixedThreadPool(numSendThreads, new NamingThreadFactory(this.getClass().getName()));
+    }
+    
+    private void binMutations(MutationSet mutationsToProcess, Map<String,TabletServerMutations> binnedMutations) {
+      try {
+        Set<Entry<String,List<Mutation>>> es = mutationsToProcess.getMutations().entrySet();
+        for (Entry<String,List<Mutation>> entry : es) {
+          TabletLocator locator = TabletLocator.getInstance(instance, credentials, new Text(entry.getKey()));
+          
+          String table = entry.getKey();
+          List<Mutation> tableMutations = entry.getValue();
+          
+          if (tableMutations != null) {
+            ArrayList<Mutation> tableFailures = new ArrayList<Mutation>();
+            locator.binMutations(tableMutations, binnedMutations, tableFailures);
+            
+            if (tableFailures.size() > 0) {
+              failedMutations.add(table, tableFailures);
+              
+              if (tableFailures.size() == tableMutations.size())
+                if (!Tables.exists(instance, entry.getKey()))
+                  throw new TableDeletedException(entry.getKey());
+                else if (Tables.getTableState(instance, table) == TableState.OFFLINE)
+                  throw new TableOfflineException(instance, entry.getKey());
+            }
+          }
+          
+        }
+        return;
+      } catch (AccumuloServerException ase) {
+        updateServerErrors(ase.getServer(), ase);
+      } catch (AccumuloException ae) {
+        // assume an IOError communicating with !METADATA tablet
+        failedMutations.add(mutationsToProcess);
+      } catch (AccumuloSecurityException e) {
+        updateAuthorizationFailures(Collections.singletonList(new KeyExtent(new Text(Constants.METADATA_TABLE_ID), null, null)));
+      } catch (TableDeletedException e) {
+        updateUnknownErrors(e.getMessage(), e);
+      } catch (TableOfflineException e) {
+        updateUnknownErrors(e.getMessage(), e);
+      } catch (TableNotFoundException e) {
+        updateUnknownErrors(e.getMessage(), e);
+      }
+      
+      // an error ocurred
+      binnedMutations.clear();
+      
+    }
+    
+    void addMutations(MutationSet mutationsToSend) {
+      Map<String,TabletServerMutations> binnedMutations = new HashMap<String,TabletServerMutations>();
+      Span span = Trace.start("binMutations");
+      try {
+        long t1 = System.currentTimeMillis();
+        binMutations(mutationsToSend, binnedMutations);
+        long t2 = System.currentTimeMillis();
+        updateBinningStats(mutationsToSend.size(), (t2 - t1), binnedMutations);
+      } finally {
+        span.stop();
+      }
+      addMutations(binnedMutations);
+    }
+    
+    private synchronized void addMutations(Map<String,TabletServerMutations> binnedMutations) {
+      
+      int count = 0;
+      
+      // merge mutations into existing mutations for a tablet server
+      for (Entry<String,TabletServerMutations> entry : binnedMutations.entrySet()) {
+        String server = entry.getKey();
+        
+        TabletServerMutations currentMutations = serversMutations.get(server);
+        
+        if (currentMutations == null) {
+          serversMutations.put(server, entry.getValue());
+        } else {
+          for (Entry<KeyExtent,List<Mutation>> entry2 : entry.getValue().getMutations().entrySet()) {
+            for (Mutation m : entry2.getValue()) {
+              currentMutations.addMutation(entry2.getKey(), m);
+            }
+          }
+        }
+        
+        if (log.isTraceEnabled())
+          for (Entry<KeyExtent,List<Mutation>> entry2 : entry.getValue().getMutations().entrySet())
+            count += entry2.getValue().size();
+        
+      }
+      
+      if (count > 0 && log.isTraceEnabled())
+        log.trace(String.format("Started sending %,d mutations to %,d tablet servers", count, binnedMutations.keySet().size()));
+      
+      // randomize order of servers
+      ArrayList<String> servers = new ArrayList<String>(binnedMutations.keySet());
+      Collections.shuffle(servers);
+      
+      for (String server : servers)
+        if (!queued.contains(server)) {
+          sendThreadPool.submit(Trace.wrap(new SendTask(server)));
+          queued.add(server);
+        }
+    }
+    
+    private synchronized TabletServerMutations getMutationsToSend(String server) {
+      TabletServerMutations tsmuts = serversMutations.remove(server);
+      if (tsmuts == null)
+        queued.remove(server);
+      
+      return tsmuts;
+    }
+    
+    class SendTask implements Runnable {
+      
+      private String location;
+      
+      SendTask(String server) {
+        this.location = server;
+      }
+      
+      @Override
+      public void run() {
+        try {
+          TabletServerMutations tsmuts = getMutationsToSend(location);
+          
+          while (tsmuts != null) {
+            send(tsmuts);
+            tsmuts = getMutationsToSend(location);
+          }
+          
+          return;
+        } catch (Throwable t) {
+          updateUnknownErrors("Failed to send tablet server " + location + " its batch : " + t.getMessage(), t);
+        }
+      }
+      
+      public void send(TabletServerMutations tsm) throws AccumuloServerException, AccumuloSecurityException {
+        
+        MutationSet failures = null;
+        
+        String oldName = Thread.currentThread().getName();
+        
+        Map<KeyExtent,List<Mutation>> mutationBatch = tsm.getMutations();
+        try {
+          
+          long count = 0;
+          for (List<Mutation> list : mutationBatch.values()) {
+            count += list.size();
+          }
+          String msg = "sending " + String.format("%,d", count) + " mutations to " + String.format("%,d", mutationBatch.size()) + " tablets at " + location;
+          Thread.currentThread().setName(msg);
+          
+          Span span = Trace.start("sendMutations");
+          try {
+            long st1 = System.currentTimeMillis();
+            failures = sendMutationsToTabletServer(location, mutationBatch);
+            long st2 = System.currentTimeMillis();
+            if (log.isTraceEnabled())
+              log.trace("sent " + String.format("%,d", count) + " mutations to " + location + " in "
+                  + String.format("%.2f secs (%,.2f mutations/sec) with %,d failures", (st2 - st1) / 1000.0, count / ((st2 - st1) / 1000.0), failures.size()));
+            
+            long successBytes = 0;
+            for (Entry<KeyExtent,List<Mutation>> entry : mutationBatch.entrySet()) {
+              for (Mutation mutation : entry.getValue()) {
+                successBytes += mutation.estimatedMemoryUsed();
+              }
+            }
+            
+            if (failures.size() > 0) {
+              failedMutations.add(failures);
+              successBytes -= failures.getMemoryUsed();
+            }
+            
+            updateSendStats(count, st2 - st1);
+            decrementMemUsed(successBytes);
+            
+          } finally {
+            span.stop();
+          }
+        } catch (IOException e) {
+          if (log.isTraceEnabled())
+            log.trace("failed to send mutations to " + location + " : " + e.getMessage());
+          
+          HashSet<String> tables = new HashSet<String>();
+          for (KeyExtent ke : mutationBatch.keySet())
+            tables.add(ke.getTableId().toString());
+          
+          for (String table : tables)
+            TabletLocator.getInstance(instance, credentials, new Text(table)).invalidateCache(location);
+          
+          failedMutations.add(location, tsm);
+        } finally {
+          Thread.currentThread().setName(oldName);
+        }
+      }
+    }
+    
+    private MutationSet sendMutationsToTabletServer(String location, Map<KeyExtent,List<Mutation>> tabMuts) throws IOException, AccumuloSecurityException,
+        AccumuloServerException {
+      if (tabMuts.size() == 0) {
+        return new MutationSet();
+      }
+      TInfo tinfo = Tracer.traceInfo();
+      TTransport transport = null;
+      
+      try {
+        TabletClientService.Iface client = ThriftUtil.getTServerClient(location, instance.getConfiguration());
+        try {
+          MutationSet allFailures = new MutationSet();
+          
+          if (tabMuts.size() == 1 && tabMuts.values().iterator().next().size() == 1) {
+            Entry<KeyExtent,List<Mutation>> entry = tabMuts.entrySet().iterator().next();
+            
+            try {
+              client.update(tinfo, credentials, entry.getKey().toThrift(), entry.getValue().get(0).toThrift());
+            } catch (NotServingTabletException e) {
+              allFailures.addAll(entry.getKey().getTableId().toString(), entry.getValue());
+              TabletLocator.getInstance(instance, credentials, new Text(entry.getKey().getTableId())).invalidateCache(entry.getKey());
+            } catch (ConstraintViolationException e) {
+              updatedConstraintViolations(Translator.translate(e.violationSummaries, Translator.TCVST));
+            }
+          } else {
+            
+            long usid = client.startUpdate(tinfo, credentials);
+            
+            List<TMutation> updates = new ArrayList<TMutation>();
+            for (Entry<KeyExtent,List<Mutation>> entry : tabMuts.entrySet()) {
+              long size = 0;
+              Iterator<Mutation> iter = entry.getValue().iterator();
+              while (iter.hasNext()) {
+                while (size < MUTATION_BATCH_SIZE && iter.hasNext()) {
+                  Mutation mutation = iter.next();
+                  updates.add(mutation.toThrift());
+                  size += mutation.numBytes();
+                }
+                
+                client.applyUpdates(tinfo, usid, entry.getKey().toThrift(), updates);
+                updates.clear();
+                size = 0;
+              }
+            }
+            
+            UpdateErrors updateErrors = client.closeUpdate(tinfo, usid);
+            Map<KeyExtent,Long> failures = Translator.translate(updateErrors.failedExtents, Translator.TKET);
+            updatedConstraintViolations(Translator.translate(updateErrors.violationSummaries, Translator.TCVST));
+            updateAuthorizationFailures(Translator.translate(updateErrors.authorizationFailures, Translator.TKET));
+            
+            for (Entry<KeyExtent,Long> entry : failures.entrySet()) {
+              KeyExtent failedExtent = entry.getKey();
+              int numCommitted = (int) (long) entry.getValue();
+              
+              String table = failedExtent.getTableId().toString();
+              
+              TabletLocator.getInstance(instance, credentials, new Text(table)).invalidateCache(failedExtent);
+              
+              ArrayList<Mutation> mutations = (ArrayList<Mutation>) tabMuts.get(failedExtent);
+              allFailures.addAll(table, mutations.subList(numCommitted, mutations.size()));
+            }
+          }
+          return allFailures;
+        } finally {
+          ThriftUtil.returnClient((TServiceClient) client);
+        }
+      } catch (TTransportException e) {
+        throw new IOException(e);
+      } catch (TApplicationException tae) {
+        updateServerErrors(location, tae);
+        throw new AccumuloServerException(location, tae);
+      } catch (ThriftSecurityException e) {
+        updateAuthorizationFailures(tabMuts.keySet());
+        throw new AccumuloSecurityException(e.user, e.code, e);
+      } catch (TException e) {
+        throw new IOException(e);
+      } catch (NoSuchScanIDException e) {
+        throw new IOException(e);
+      } finally {
+        ThriftTransportPool.getInstance().returnTransport(transport);
+      }
+    }
+    
+  }
+  
+  // END code for sending mutations to tablet servers using background threads
+  
+  private static class MutationSet {
+    
+    private HashMap<String,List<Mutation>> mutations;
+    private int memoryUsed = 0;
+    
+    MutationSet() {
+      mutations = new HashMap<String,List<Mutation>>();
+    }
+    
+    void addMutation(String table, Mutation mutation) {
+      List<Mutation> tabMutList = mutations.get(table);
+      if (tabMutList == null) {
+        tabMutList = new ArrayList<Mutation>();
+        mutations.put(table, tabMutList);
+      }
+      
+      tabMutList.add(mutation);
+      
+      memoryUsed += mutation.estimatedMemoryUsed();
+    }
+    
+    Map<String,List<Mutation>> getMutations() {
+      return mutations;
+    }
+    
+    int size() {
+      int result = 0;
+      for (List<Mutation> perTable : mutations.values()) {
+        result += perTable.size();
+      }
+      return result;
+    }
+    
+    public void addAll(MutationSet failures) {
+      Set<Entry<String,List<Mutation>>> es = failures.getMutations().entrySet();
+      
+      for (Entry<String,List<Mutation>> entry : es) {
+        String table = entry.getKey();
+        
+        for (Mutation mutation : entry.getValue()) {
+          addMutation(table, mutation);
+        }
+      }
+    }
+    
+    public void addAll(String table, List<Mutation> mutations) {
+      for (Mutation mutation : mutations) {
+        addMutation(table, mutation);
+      }
+    }
+    
+    public int getMemoryUsed() {
+      return memoryUsed;
+    }
+    
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/client/impl/TabletType.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/client/impl/TabletType.java b/1.5/core/src/main/java/org/apache/accumulo/core/client/impl/TabletType.java
new file mode 100644
index 0000000..11b94a3
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/client/impl/TabletType.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.impl;
+
+import java.util.Collection;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.data.KeyExtent;
+
+public enum TabletType {
+  ROOT, METADATA, USER;
+  
+  public static TabletType type(KeyExtent ke) {
+    if (ke.isRootTablet())
+      return ROOT;
+    if (ke.isMeta())
+      return METADATA;
+    return USER;
+  }
+  
+  public static TabletType type(Collection<KeyExtent> extents) {
+    if (extents.size() == 0)
+      throw new IllegalArgumentException();
+    
+    TabletType ttype = null;
+    
+    for (KeyExtent extent : extents) {
+      if (ttype == null)
+        ttype = type(extent);
+      else if (ttype != type(extent))
+        throw new IllegalArgumentException("multiple extent types not allowed " + ttype + " " + type(extent));
+    }
+    
+    return ttype;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java b/1.5/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java
new file mode 100644
index 0000000..3ad8397
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java
@@ -0,0 +1,480 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.SortedSet;
+
+import org.apache.accumulo.cloudtrace.instrument.Span;
+import org.apache.accumulo.cloudtrace.instrument.Trace;
+import org.apache.accumulo.cloudtrace.instrument.Tracer;
+import org.apache.accumulo.cloudtrace.thrift.TInfo;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.TableDeletedException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.TableOfflineException;
+import org.apache.accumulo.core.client.impl.TabletLocator.TabletLocation;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.data.Column;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.KeyValue;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.data.thrift.InitialScan;
+import org.apache.accumulo.core.data.thrift.IterInfo;
+import org.apache.accumulo.core.data.thrift.ScanResult;
+import org.apache.accumulo.core.data.thrift.TKeyValue;
+import org.apache.accumulo.core.master.state.tables.TableState;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.thrift.AuthInfo;
+import org.apache.accumulo.core.security.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
+import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
+import org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException;
+import org.apache.accumulo.core.util.OpTimer;
+import org.apache.accumulo.core.util.TextUtil;
+import org.apache.accumulo.core.util.ThriftUtil;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TApplicationException;
+import org.apache.thrift.TException;
+import org.apache.thrift.TServiceClient;
+
+
+public class ThriftScanner {
+  private static final byte[] EMPTY_BYTES = new byte[0];
+  private static final Logger log = Logger.getLogger(ThriftScanner.class);
+  
+  public static Map<TabletType,Set<String>> serversWaitedForWrites = new EnumMap<TabletType,Set<String>>(TabletType.class);
+  
+  static {
+    for (TabletType ttype : TabletType.values()) {
+      serversWaitedForWrites.put(ttype, Collections.synchronizedSet(new HashSet<String>()));
+    }
+  }
+  
+  public static boolean getBatchFromServer(AuthInfo credentials, Text startRow, KeyExtent extent, String server, SortedMap<Key,Value> results,
+      SortedSet<Column> fetchedColumns, boolean skipStartKey, int size, Authorizations authorizations, boolean retry, AccumuloConfiguration conf)
+      throws AccumuloException, AccumuloSecurityException, NotServingTabletException {
+    Key startKey;
+    
+    if (fetchedColumns.size() > 0) {
+      byte[] cf = fetchedColumns.first().columnFamily;
+      byte[] cq = fetchedColumns.first().columnQualifier;
+      byte[] cv = fetchedColumns.first().columnVisibility;
+      
+      startKey = new Key(TextUtil.getBytes(startRow), cf, cq == null ? EMPTY_BYTES : cq, cv == null ? EMPTY_BYTES : cv, Long.MAX_VALUE);
+      
+    } else
+      startKey = new Key(startRow);
+    
+    if (skipStartKey)
+      startKey = startKey.followingKey(PartialKey.ROW);
+    else
+      startKey.setTimestamp(Long.MAX_VALUE);
+    
+    return getBatchFromServer(credentials, startKey, (Key) null, extent, server, results, fetchedColumns, size, authorizations, retry, conf);
+  }
+  
+  static boolean getBatchFromServer(AuthInfo credentials, Key key, Key endKey, KeyExtent extent, String server, SortedMap<Key,Value> results,
+      SortedSet<Column> fetchedColumns, int size, Authorizations authorizations, boolean retry, AccumuloConfiguration conf) throws AccumuloException,
+      AccumuloSecurityException, NotServingTabletException {
+    return getBatchFromServer(credentials, new Range(key, true, endKey, true), extent, server, results, fetchedColumns, size, authorizations, retry, conf);
+  }
+  
+  static boolean getBatchFromServer(AuthInfo credentials, Range range, KeyExtent extent, String server, SortedMap<Key,Value> results,
+      SortedSet<Column> fetchedColumns, int size, Authorizations authorizations, boolean retry, AccumuloConfiguration conf) throws AccumuloException,
+      AccumuloSecurityException, NotServingTabletException {
+    if (server == null)
+      throw new AccumuloException(new IOException());
+    
+    try {
+      TInfo tinfo = Tracer.traceInfo();
+      TabletClientService.Client client = ThriftUtil.getTServerClient(server, conf);
+      try {
+        List<IterInfo> emptyList = Collections.emptyList();
+        Map<String,Map<String,String>> emptyMap = Collections.emptyMap();
+        // not reading whole rows (or stopping on row boundries) so there is no need to enable isolation below
+        ScanState scanState = new ScanState(credentials, extent.getTableId(), authorizations, range, fetchedColumns, size, emptyList, emptyMap, false);
+        
+        TabletType ttype = TabletType.type(extent);
+        boolean waitForWrites = !serversWaitedForWrites.get(ttype).contains(server);
+        InitialScan isr = client.startScan(tinfo, scanState.credentials, extent.toThrift(), scanState.range.toThrift(),
+            Translator.translate(scanState.columns, Translator.CT), scanState.size, scanState.serverSideIteratorList, scanState.serverSideIteratorOptions,
+            scanState.authorizations.getAuthorizationsBB(), waitForWrites, scanState.isolated);
+        if (waitForWrites)
+          serversWaitedForWrites.get(ttype).add(server);
+        
+        Key.decompress(isr.result.results);
+        
+        for (TKeyValue kv : isr.result.results)
+          results.put(new Key(kv.key), new Value(kv.value));
+        
+        client.closeScan(tinfo, isr.scanID);
+        
+        return isr.result.more;
+      } finally {
+        ThriftUtil.returnClient((TServiceClient) client);
+      }
+    } catch (TApplicationException tae) {
+      throw new AccumuloServerException(server, tae);
+    } catch (TooManyFilesException e) {
+      log.debug("Tablet (" + extent + ") has too many files " + server + " : " + e);
+    } catch (TException e) {
+      log.debug("Error getting transport to " + server + " : " + e);
+    } catch (ThriftSecurityException e) {
+      log.warn("Security Violation in scan request to " + server + ": " + e);
+      throw new AccumuloSecurityException(e.user, e.code, e);
+    }
+    
+    throw new AccumuloException("getBatchFromServer: failed");
+  }
+  
+  public static class ScanState {
+    
+    boolean isolated;
+    Text tableName;
+    Text startRow;
+    boolean skipStartRow;
+    
+    Range range;
+    
+    int size;
+    
+    AuthInfo credentials;
+    Authorizations authorizations;
+    List<Column> columns;
+    
+    TabletLocation prevLoc;
+    Long scanID;
+    
+    boolean finished = false;
+    
+    List<IterInfo> serverSideIteratorList;
+    
+    Map<String,Map<String,String>> serverSideIteratorOptions;
+    
+    public ScanState(AuthInfo credentials, Text tableName, Authorizations authorizations, Range range, SortedSet<Column> fetchedColumns, int size,
+        List<IterInfo> serverSideIteratorList, Map<String,Map<String,String>> serverSideIteratorOptions, boolean isolated) {
+      this.credentials = credentials;
+      this.authorizations = authorizations;
+      
+      columns = new ArrayList<Column>(fetchedColumns.size());
+      for (Column column : fetchedColumns) {
+        columns.add(column);
+      }
+      
+      this.tableName = tableName;
+      this.range = range;
+      
+      Key startKey = range.getStartKey();
+      if (startKey == null) {
+        startKey = new Key();
+      }
+      this.startRow = startKey.getRow();
+      
+      this.skipStartRow = false;
+      
+      this.size = size;
+      
+      this.serverSideIteratorList = serverSideIteratorList;
+      this.serverSideIteratorOptions = serverSideIteratorOptions;
+      
+      this.isolated = isolated;
+      
+    }
+  }
+  
+  public static class ScanTimedOutException extends IOException {
+    
+    private static final long serialVersionUID = 1L;
+    
+  }
+  
+  public static List<KeyValue> scan(Instance instance, AuthInfo credentials, ScanState scanState, int timeOut, AccumuloConfiguration conf)
+      throws ScanTimedOutException, AccumuloException, AccumuloSecurityException, TableNotFoundException {
+    TabletLocation loc = null;
+    
+    long startTime = System.currentTimeMillis();
+    String lastError = null;
+    String error = null;
+    int tooManyFilesCount = 0;
+    
+    List<KeyValue> results = null;
+    
+    Span span = Trace.start("scan");
+    try {
+      while (results == null && !scanState.finished) {
+        
+        if ((System.currentTimeMillis() - startTime) / 1000.0 > timeOut)
+          throw new ScanTimedOutException();
+        
+        while (loc == null) {
+          long currentTime = System.currentTimeMillis();
+          if ((currentTime - startTime) / 1000.0 > timeOut)
+            throw new ScanTimedOutException();
+          
+          Span locateSpan = Trace.start("scan:locateTablet");
+          try {
+            loc = TabletLocator.getInstance(instance, credentials, scanState.tableName).locateTablet(scanState.startRow, scanState.skipStartRow, false);
+            if (loc == null) {
+              if (!Tables.exists(instance, scanState.tableName.toString()))
+                throw new TableDeletedException(scanState.tableName.toString());
+              else if (Tables.getTableState(instance, scanState.tableName.toString()) == TableState.OFFLINE)
+                throw new TableOfflineException(instance, scanState.tableName.toString());
+              
+              error = "Failed to locate tablet for table : " + scanState.tableName + " row : " + scanState.startRow;
+              if (!error.equals(lastError))
+                log.debug(error);
+              else if (log.isTraceEnabled())
+                log.trace(error);
+              lastError = error;
+              UtilWaitThread.sleep(100);
+            } else {
+              // when a tablet splits we do want to continue scanning the low child
+              // of the split if we are already passed it
+              Range dataRange = loc.tablet_extent.toDataRange();
+              
+              if (scanState.range.getStartKey() != null && dataRange.afterEndKey(scanState.range.getStartKey())) {
+                // go to the next tablet
+                scanState.startRow = loc.tablet_extent.getEndRow();
+                scanState.skipStartRow = true;
+                loc = null;
+              } else if (scanState.range.getEndKey() != null && dataRange.beforeStartKey(scanState.range.getEndKey())) {
+                // should not happen
+                throw new RuntimeException("Unexpected tablet, extent : " + loc.tablet_extent + "  range : " + scanState.range + " startRow : "
+                    + scanState.startRow);
+              }
+            }
+          } catch (AccumuloServerException e) {
+            log.debug("Scan failed, server side exception : " + e.getMessage());
+            throw e;
+          } catch (AccumuloException e) {
+            error = "exception from tablet loc " + e.getMessage();
+            if (!error.equals(lastError))
+              log.debug(error);
+            else if (log.isTraceEnabled())
+              log.trace(error);
+            
+            lastError = error;
+            UtilWaitThread.sleep(100);
+          } finally {
+            locateSpan.stop();
+          }
+        }
+        
+        Span scanLocation = Trace.start("scan:location");
+        scanLocation.data("tserver", loc.tablet_location);
+        try {
+          results = scan(loc, scanState, conf);
+        } catch (AccumuloSecurityException e) {
+          Tables.clearCache(instance);
+          if (!Tables.exists(instance, scanState.tableName.toString()))
+            throw new TableDeletedException(scanState.tableName.toString());
+          throw e;
+        } catch (TApplicationException tae) {
+          throw new AccumuloServerException(loc.tablet_location, tae);
+        } catch (NotServingTabletException e) {
+          error = "Scan failed, not serving tablet " + loc;
+          if (!error.equals(lastError))
+            log.debug(error);
+          else if (log.isTraceEnabled())
+            log.trace(error);
+          lastError = error;
+          
+          TabletLocator.getInstance(instance, credentials, scanState.tableName).invalidateCache(loc.tablet_extent);
+          loc = null;
+          
+          // no need to try the current scan id somewhere else
+          scanState.scanID = null;
+          
+          if (scanState.isolated)
+            throw new IsolationException();
+          
+          UtilWaitThread.sleep(100);
+        } catch (TException e) {
+          TabletLocator.getInstance(instance, credentials, scanState.tableName).invalidateCache(loc.tablet_location);
+          error = "Scan failed, thrift error " + e.getClass().getName() + "  " + e.getMessage() + " " + loc;
+          if (!error.equals(lastError))
+            log.debug(error);
+          else if (log.isTraceEnabled())
+            log.trace(error);
+          lastError = error;
+          loc = null;
+          
+          // do not want to continue using the same scan id, if a timeout occurred could cause a batch to be skipped
+          // because a thread on the server side may still be processing the timed out continue scan
+          scanState.scanID = null;
+          
+          if (scanState.isolated)
+            throw new IsolationException();
+          
+          UtilWaitThread.sleep(100);
+        } catch (NoSuchScanIDException e) {
+          error = "Scan failed, no such scan id " + scanState.scanID + " " + loc;
+          if (!error.equals(lastError))
+            log.debug(error);
+          else if (log.isTraceEnabled())
+            log.trace(error);
+          lastError = error;
+          
+          if (scanState.isolated)
+            throw new IsolationException();
+          
+          scanState.scanID = null;
+        } catch (TooManyFilesException e) {
+          error = "Tablet has too many files " + loc + " retrying...";
+          if (!error.equals(lastError)) {
+            log.debug(error);
+            tooManyFilesCount = 0;
+          } else {
+            tooManyFilesCount++;
+            if (tooManyFilesCount == 300)
+              log.warn(error);
+            else if (log.isTraceEnabled())
+              log.trace(error);
+          }
+          lastError = error;
+          
+          // not sure what state the scan session on the server side is
+          // in after this occurs, so lets be cautious and start a new
+          // scan session
+          scanState.scanID = null;
+          
+          if (scanState.isolated)
+            throw new IsolationException();
+          
+          UtilWaitThread.sleep(100);
+        } finally {
+          scanLocation.stop();
+        }
+      }
+      
+      if (results != null && results.size() == 0 && scanState.finished) {
+        results = null;
+      }
+      
+      return results;
+    } finally {
+      span.stop();
+    }
+  }
+  
+  private static List<KeyValue> scan(TabletLocation loc, ScanState scanState, AccumuloConfiguration conf) throws AccumuloSecurityException,
+      NotServingTabletException, TException, NoSuchScanIDException, TooManyFilesException {
+    if (scanState.finished)
+      return null;
+    
+    OpTimer opTimer = new OpTimer(log, Level.TRACE);
+    
+    TInfo tinfo = Tracer.traceInfo();
+    TabletClientService.Client client = ThriftUtil.getTServerClient(loc.tablet_location, conf);
+    
+    String old = Thread.currentThread().getName();
+    try {
+      ScanResult sr;
+      
+      if (scanState.prevLoc != null && !scanState.prevLoc.equals(loc))
+        scanState.scanID = null;
+      
+      scanState.prevLoc = loc;
+      
+      if (scanState.scanID == null) {
+        String msg = "Starting scan tserver=" + loc.tablet_location + " tablet=" + loc.tablet_extent + " range=" + scanState.range + " ssil="
+            + scanState.serverSideIteratorList + " ssio=" + scanState.serverSideIteratorOptions;
+        Thread.currentThread().setName(msg);
+        opTimer.start(msg);
+        
+        TabletType ttype = TabletType.type(loc.tablet_extent);
+        boolean waitForWrites = !serversWaitedForWrites.get(ttype).contains(loc.tablet_location);
+        InitialScan is = client.startScan(tinfo, scanState.credentials, loc.tablet_extent.toThrift(), scanState.range.toThrift(),
+            Translator.translate(scanState.columns, Translator.CT), scanState.size, scanState.serverSideIteratorList, scanState.serverSideIteratorOptions,
+            scanState.authorizations.getAuthorizationsBB(), waitForWrites, scanState.isolated);
+        if (waitForWrites)
+          serversWaitedForWrites.get(ttype).add(loc.tablet_location);
+        
+        sr = is.result;
+        
+        if (sr.more)
+          scanState.scanID = is.scanID;
+        else
+          client.closeScan(tinfo, is.scanID);
+        
+      } else {
+        // log.debug("Calling continue scan : "+scanState.range+"  loc = "+loc);
+        String msg = "Continuing scan tserver=" + loc.tablet_location + " scanid=" + scanState.scanID;
+        Thread.currentThread().setName(msg);
+        opTimer.start(msg);
+        
+        sr = client.continueScan(tinfo, scanState.scanID);
+        if (!sr.more) {
+          client.closeScan(tinfo, scanState.scanID);
+          scanState.scanID = null;
+        }
+      }
+      
+      if (!sr.more) {
+        // log.debug("No more : tab end row = "+loc.tablet_extent.getEndRow()+" range = "+scanState.range);
+        if (loc.tablet_extent.getEndRow() == null) {
+          scanState.finished = true;
+          opTimer.stop("Completely finished scan in %DURATION% #results=" + sr.results.size());
+        } else if (scanState.range.getEndKey() == null || !scanState.range.afterEndKey(new Key(loc.tablet_extent.getEndRow()).followingKey(PartialKey.ROW))) {
+          scanState.startRow = loc.tablet_extent.getEndRow();
+          scanState.skipStartRow = true;
+          opTimer.stop("Finished scanning tablet in %DURATION% #results=" + sr.results.size());
+        } else {
+          scanState.finished = true;
+          opTimer.stop("Completely finished scan in %DURATION% #results=" + sr.results.size());
+        }
+      } else {
+        opTimer.stop("Finished scan in %DURATION% #results=" + sr.results.size() + " scanid=" + scanState.scanID);
+      }
+      
+      Key.decompress(sr.results);
+      
+      if (sr.results.size() > 0 && !scanState.finished)
+        scanState.range = new Range(new Key(sr.results.get(sr.results.size() - 1).key), false, scanState.range.getEndKey(), scanState.range.isEndKeyInclusive());
+      
+      List<KeyValue> results = new ArrayList<KeyValue>(sr.results.size());
+      for (TKeyValue tkv : sr.results)
+        results.add(new KeyValue(new Key(tkv.key), tkv.value));
+      
+      return results;
+      
+    } catch (ThriftSecurityException e) {
+      throw new AccumuloSecurityException(e.user, e.code, e);
+    } finally {
+      ThriftUtil.returnClient((TServiceClient) client);
+      Thread.currentThread().setName(old);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportKey.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportKey.java b/1.5/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportKey.java
new file mode 100644
index 0000000..aeecee0
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportKey.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.impl;
+
+import org.apache.accumulo.core.util.ArgumentChecker;
+
+class ThriftTransportKey {
+  private final String location;
+  private final int port;
+  private final long timeout;
+  
+  private int hash = -1;
+  
+  ThriftTransportKey(String location, int port, long timeout) {
+    ArgumentChecker.notNull(location);
+    this.location = location;
+    this.port = port;
+    this.timeout = timeout;
+  }
+  
+  String getLocation() {
+    return location;
+  }
+  
+  int getPort() {
+    return port;
+  }
+  
+  long getTimeout() {
+    return timeout;
+  }
+  
+  @Override
+  public boolean equals(Object o) {
+    if (!(o instanceof ThriftTransportKey))
+      return false;
+    ThriftTransportKey ttk = (ThriftTransportKey) o;
+    return location.equals(ttk.location) && port == ttk.port && timeout == ttk.timeout;
+  }
+  
+  @Override
+  public int hashCode() {
+    if (hash == -1)
+      hash = (location + Integer.toString(port) + Long.toString(timeout)).hashCode();
+    return hash;
+  }
+  
+  @Override
+  public String toString() {
+    return location + ":" + Integer.toString(port) + " (" + Long.toString(timeout) + ")";
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java b/1.5/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
new file mode 100644
index 0000000..ef3724b
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
@@ -0,0 +1,607 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.impl;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.security.SecurityPermission;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.util.AddressUtil;
+import org.apache.accumulo.core.util.Daemon;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.util.TTimeoutTransport;
+import org.apache.accumulo.core.util.ThriftUtil;
+import org.apache.log4j.Logger;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+public class ThriftTransportPool {
+  private static SecurityPermission TRANSPORT_POOL_PERMISSION = new SecurityPermission("transportPoolPermission");
+  
+  private static final Random random = new Random();
+  private long killTime = 1000 * 3;
+  
+  private Map<ThriftTransportKey,List<CachedConnection>> cache = new HashMap<ThriftTransportKey,List<CachedConnection>>();
+  private Map<ThriftTransportKey,Long> errorCount = new HashMap<ThriftTransportKey,Long>();
+  private Map<ThriftTransportKey,Long> errorTime = new HashMap<ThriftTransportKey,Long>();
+  private Set<ThriftTransportKey> serversWarnedAbout = new HashSet<ThriftTransportKey>();
+  
+  private static final Logger log = Logger.getLogger(ThriftTransportPool.class);
+  
+  private static final Long ERROR_THRESHOLD = 20l;
+  private static final int STUCK_THRESHOLD = 2 * 60 * 1000;
+  
+  private static class CachedConnection {
+    
+    public CachedConnection(CachedTTransport t) {
+      this.transport = t;
+    }
+    
+    void setReserved(boolean reserved) {
+      this.transport.setReserved(reserved);
+    }
+    
+    boolean isReserved() {
+      return this.transport.reserved;
+    }
+    
+    CachedTTransport transport;
+    
+    long lastReturnTime;
+  }
+  
+  private static class Closer implements Runnable {
+    ThriftTransportPool pool;
+    
+    public Closer(ThriftTransportPool pool) {
+      this.pool = pool;
+    }
+    
+    public void run() {
+      while (true) {
+        
+        ArrayList<CachedConnection> connectionsToClose = new ArrayList<CachedConnection>();
+        
+        synchronized (pool) {
+          for (List<CachedConnection> ccl : pool.cache.values()) {
+            Iterator<CachedConnection> iter = ccl.iterator();
+            while (iter.hasNext()) {
+              CachedConnection cachedConnection = iter.next();
+              
+              if (!cachedConnection.isReserved() && System.currentTimeMillis() - cachedConnection.lastReturnTime > pool.killTime) {
+                connectionsToClose.add(cachedConnection);
+                iter.remove();
+              }
+            }
+          }
+          
+          for (List<CachedConnection> ccl : pool.cache.values()) {
+            for (CachedConnection cachedConnection : ccl) {
+              cachedConnection.transport.checkForStuckIO(STUCK_THRESHOLD);
+            }
+          }
+          
+          Iterator<Entry<ThriftTransportKey,Long>> iter = pool.errorTime.entrySet().iterator();
+          while (iter.hasNext()) {
+            Entry<ThriftTransportKey,Long> entry = iter.next();
+            long delta = System.currentTimeMillis() - entry.getValue();
+            if (delta >= STUCK_THRESHOLD) {
+              pool.errorCount.remove(entry.getKey());
+              iter.remove();
+            }
+          }
+        }
+        
+        // close connections outside of sync block
+        for (CachedConnection cachedConnection : connectionsToClose) {
+          cachedConnection.transport.close();
+        }
+        
+        try {
+          Thread.sleep(500);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+      }
+    }
+  }
+  
+  static class CachedTTransport extends TTransport {
+    
+    private ThriftTransportKey cacheKey;
+    private TTransport wrappedTransport;
+    private boolean sawError = false;
+    
+    private volatile String ioThreadName = null;
+    private volatile long ioStartTime = 0;
+    private volatile boolean reserved = false;
+    
+    private String stuckThreadName = null;
+    
+    int ioCount = 0;
+    int lastIoCount = -1;
+    
+    private void sawError(Exception e) {
+      sawError = true;
+    }
+    
+    final void setReserved(boolean reserved) {
+      this.reserved = reserved;
+      if (reserved) {
+        ioThreadName = Thread.currentThread().getName();
+        ioCount = 0;
+        lastIoCount = -1;
+      } else {
+        if ((ioCount & 1) == 1) {
+          // connection unreserved, but it seems io may still be
+          // happening
+          log.warn("Connection returned to thrift connection pool that may still be in use " + ioThreadName + " " + Thread.currentThread().getName(),
+              new Exception());
+        }
+        
+        ioCount = 0;
+        lastIoCount = -1;
+        ioThreadName = null;
+      }
+      checkForStuckIO(STUCK_THRESHOLD);
+    }
+    
+    final void checkForStuckIO(long threshold) {
+      /*
+       * checking for stuck io needs to be light weight.
+       * 
+       * Tried to call System.currentTimeMillis() and Thread.currentThread() before every io operation.... this dramatically slowed things down. So switched to
+       * incrementing a counter before and after each io operation.
+       */
+      
+      if ((ioCount & 1) == 1) {
+        // when ioCount is odd, it means I/O is currently happening
+        if (ioCount == lastIoCount) {
+          // still doing same I/O operation as last time this
+          // functions was called
+          long delta = System.currentTimeMillis() - ioStartTime;
+          if (delta >= threshold && stuckThreadName == null) {
+            stuckThreadName = ioThreadName;
+            log.warn("Thread \"" + ioThreadName + "\" stuck on IO  to " + cacheKey + " for at least " + delta + " ms");
+          }
+        } else {
+          // remember this ioCount and the time we saw it, need to see
+          // if it changes
+          lastIoCount = ioCount;
+          ioStartTime = System.currentTimeMillis();
+          
+          if (stuckThreadName != null) {
+            // doing I/O, but ioCount changed so no longer stuck
+            log.info("Thread \"" + stuckThreadName + "\" no longer stuck on IO  to " + cacheKey + " sawError = " + sawError);
+            stuckThreadName = null;
+          }
+        }
+      } else {
+        // I/O is not currently happening
+        if (stuckThreadName != null) {
+          // no longer stuck, and was stuck in the past
+          log.info("Thread \"" + stuckThreadName + "\" no longer stuck on IO  to " + cacheKey + " sawError = " + sawError);
+          stuckThreadName = null;
+        }
+      }
+    }
+    
+    public CachedTTransport(TTransport transport, ThriftTransportKey cacheKey2) {
+      this.wrappedTransport = transport;
+      this.cacheKey = cacheKey2;
+    }
+    
+    public boolean isOpen() {
+      return wrappedTransport.isOpen();
+    }
+    
+    public void open() throws TTransportException {
+      try {
+        ioCount++;
+        wrappedTransport.open();
+      } catch (TTransportException tte) {
+        sawError(tte);
+        throw tte;
+      } finally {
+        ioCount++;
+      }
+    }
+    
+    public int read(byte[] arg0, int arg1, int arg2) throws TTransportException {
+      try {
+        ioCount++;
+        return wrappedTransport.read(arg0, arg1, arg2);
+      } catch (TTransportException tte) {
+        sawError(tte);
+        throw tte;
+      } finally {
+        ioCount++;
+      }
+    }
+    
+    public int readAll(byte[] arg0, int arg1, int arg2) throws TTransportException {
+      try {
+        ioCount++;
+        return wrappedTransport.readAll(arg0, arg1, arg2);
+      } catch (TTransportException tte) {
+        sawError(tte);
+        throw tte;
+      } finally {
+        ioCount++;
+      }
+    }
+    
+    public void write(byte[] arg0, int arg1, int arg2) throws TTransportException {
+      try {
+        ioCount++;
+        wrappedTransport.write(arg0, arg1, arg2);
+      } catch (TTransportException tte) {
+        sawError(tte);
+        throw tte;
+      } finally {
+        ioCount++;
+      }
+    }
+    
+    public void write(byte[] arg0) throws TTransportException {
+      try {
+        ioCount++;
+        wrappedTransport.write(arg0);
+      } catch (TTransportException tte) {
+        sawError(tte);
+        throw tte;
+      } finally {
+        ioCount++;
+      }
+    }
+    
+    public void close() {
+      try {
+        ioCount++;
+        wrappedTransport.close();
+      } finally {
+        ioCount++;
+      }
+      
+    }
+    
+    public void flush() throws TTransportException {
+      try {
+        ioCount++;
+        wrappedTransport.flush();
+      } catch (TTransportException tte) {
+        sawError(tte);
+        throw tte;
+      } finally {
+        ioCount++;
+      }
+    }
+    
+    public boolean peek() {
+      try {
+        ioCount++;
+        return wrappedTransport.peek();
+      } finally {
+        ioCount++;
+      }
+    }
+    
+    public byte[] getBuffer() {
+      try {
+        ioCount++;
+        return wrappedTransport.getBuffer();
+      } finally {
+        ioCount++;
+      }
+    }
+    
+    public int getBufferPosition() {
+      try {
+        ioCount++;
+        return wrappedTransport.getBufferPosition();
+      } finally {
+        ioCount++;
+      }
+    }
+    
+    public int getBytesRemainingInBuffer() {
+      try {
+        ioCount++;
+        return wrappedTransport.getBytesRemainingInBuffer();
+      } finally {
+        ioCount++;
+      }
+    }
+    
+    public void consumeBuffer(int len) {
+      try {
+        ioCount++;
+        wrappedTransport.consumeBuffer(len);
+      } finally {
+        ioCount++;
+      }
+    }
+    
+    public ThriftTransportKey getCacheKey() {
+      return cacheKey;
+    }
+    
+  }
+  
+  private ThriftTransportPool() {}
+  
+  public TTransport getTransport(String location, int port) throws TTransportException {
+    return getTransport(location, port, 0);
+  }
+  
+  public TTransport getTransportWithDefaultTimeout(InetSocketAddress addr, AccumuloConfiguration conf) throws TTransportException {
+    return getTransport(addr.getAddress().getHostAddress(), addr.getPort(), conf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT));
+  }
+  
+  public TTransport getTransport(InetSocketAddress addr, long timeout) throws TTransportException {
+    return getTransport(addr.getAddress().getHostAddress(), addr.getPort(), timeout);
+  }
+  
+  public TTransport getTransportWithDefaultTimeout(String location, int port, AccumuloConfiguration conf) throws TTransportException {
+    return getTransport(location, port, conf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT));
+  }
+  
+  Pair<String,TTransport> getAnyTransport(List<ThriftTransportKey> servers, boolean preferCachedConnection) throws TTransportException {
+    
+    servers = new ArrayList<ThriftTransportKey>(servers);
+    
+    if (preferCachedConnection) {
+      HashSet<ThriftTransportKey> serversSet = new HashSet<ThriftTransportKey>(servers);
+      
+      synchronized (this) {
+        
+        // randomly pick a server from the connection cache
+        serversSet.retainAll(cache.keySet());
+        
+        if (serversSet.size() > 0) {
+          ArrayList<ThriftTransportKey> cachedServers = new ArrayList<ThriftTransportKey>(serversSet);
+          Collections.shuffle(cachedServers, random);
+          
+          for (ThriftTransportKey ttk : cachedServers) {
+            for (CachedConnection cachedConnection : cache.get(ttk)) {
+              if (!cachedConnection.isReserved()) {
+                cachedConnection.setReserved(true);
+                if (log.isTraceEnabled())
+                  log.trace("Using existing connection to " + ttk.getLocation() + ":" + ttk.getPort());
+                return new Pair<String,TTransport>(ttk.getLocation() + ":" + ttk.getPort(), cachedConnection.transport);
+              }
+            }
+          }
+        }
+      }
+    }
+    
+    int retryCount = 0;
+    while (servers.size() > 0 && retryCount < 10) {
+      int index = random.nextInt(servers.size());
+      ThriftTransportKey ttk = servers.get(index);
+      
+      if (!preferCachedConnection) {
+        synchronized (this) {
+          List<CachedConnection> cachedConnList = cache.get(ttk);
+          if (cachedConnList != null) {
+            for (CachedConnection cachedConnection : cachedConnList) {
+              if (!cachedConnection.isReserved()) {
+                cachedConnection.setReserved(true);
+                if (log.isTraceEnabled())
+                  log.trace("Using existing connection to " + ttk.getLocation() + ":" + ttk.getPort());
+                return new Pair<String,TTransport>(ttk.getLocation() + ":" + ttk.getPort(), cachedConnection.transport);
+              }
+            }
+          }
+        }
+      }
+
+      try {
+        return new Pair<String,TTransport>(ttk.getLocation() + ":" + ttk.getPort(), createNewTransport(ttk));
+      } catch (TTransportException tte) {
+        log.debug("Failed to connect to " + servers.get(index), tte);
+        servers.remove(index);
+        retryCount++;
+      }
+    }
+    
+    throw new TTransportException("Failed to connect to a server");
+  }
+  
+  public TTransport getTransport(String location, int port, long milliseconds) throws TTransportException {
+    return getTransport(new ThriftTransportKey(location, port, milliseconds));
+  }
+  
+  private TTransport getTransport(ThriftTransportKey cacheKey) throws TTransportException {
+    synchronized (this) {
+      // atomically reserve location if it exist in cache
+      List<CachedConnection> ccl = cache.get(cacheKey);
+      
+      if (ccl == null) {
+        ccl = new LinkedList<CachedConnection>();
+        cache.put(cacheKey, ccl);
+      }
+      
+      for (CachedConnection cachedConnection : ccl) {
+        if (!cachedConnection.isReserved()) {
+          cachedConnection.setReserved(true);
+          if (log.isTraceEnabled())
+            log.trace("Using existing connection to " + cacheKey.getLocation() + ":" + cacheKey.getPort());
+          return cachedConnection.transport;
+        }
+      }
+    }
+    
+    return createNewTransport(cacheKey);
+  }
+  
+  private TTransport createNewTransport(ThriftTransportKey cacheKey) throws TTransportException {
+    TTransport transport;
+    if (cacheKey.getTimeout() == 0) {
+      transport = AddressUtil.createTSocket(cacheKey.getLocation(), cacheKey.getPort());
+    } else {
+      try {
+        transport = TTimeoutTransport.create(AddressUtil.parseAddress(cacheKey.getLocation(), cacheKey.getPort()), cacheKey.getTimeout());
+      } catch (IOException ex) {
+        throw new TTransportException(ex);
+      }
+    }
+    transport = ThriftUtil.transportFactory().getTransport(transport);
+    transport.open();
+    
+    if (log.isTraceEnabled())
+      log.trace("Creating new connection to connection to " + cacheKey.getLocation() + ":" + cacheKey.getPort());
+    
+    CachedTTransport tsc = new CachedTTransport(transport, cacheKey);
+    
+    CachedConnection cc = new CachedConnection(tsc);
+    cc.setReserved(true);
+    
+    synchronized (this) {
+      List<CachedConnection> ccl = cache.get(cacheKey);
+      
+      if (ccl == null) {
+        ccl = new LinkedList<CachedConnection>();
+        cache.put(cacheKey, ccl);
+      }
+      
+      ccl.add(cc);
+    }
+    return cc.transport;
+  }
+  
+  public void returnTransport(TTransport tsc) {
+    if (tsc == null) {
+      return;
+    }
+    
+    boolean existInCache = false;
+    CachedTTransport ctsc = (CachedTTransport) tsc;
+    
+    ArrayList<CachedConnection> closeList = new ArrayList<ThriftTransportPool.CachedConnection>();
+
+    synchronized (this) {
+      List<CachedConnection> ccl = cache.get(ctsc.getCacheKey());
+      for (Iterator<CachedConnection> iterator = ccl.iterator(); iterator.hasNext();) {
+        CachedConnection cachedConnection = iterator.next();
+        if (cachedConnection.transport == tsc) {
+          if (ctsc.sawError) {
+            closeList.add(cachedConnection);
+            iterator.remove();
+            
+            if (log.isTraceEnabled())
+              log.trace("Returned connection had error " + ctsc.getCacheKey());
+            
+            Long ecount = errorCount.get(ctsc.getCacheKey());
+            if (ecount == null)
+              ecount = 0l;
+            ecount++;
+            errorCount.put(ctsc.getCacheKey(), ecount);
+            
+            Long etime = errorTime.get(ctsc.getCacheKey());
+            if (etime == null) {
+              errorTime.put(ctsc.getCacheKey(), System.currentTimeMillis());
+            }
+            
+            if (ecount >= ERROR_THRESHOLD && !serversWarnedAbout.contains(ctsc.getCacheKey())) {
+              log.warn("Server " + ctsc.getCacheKey() + " had " + ecount + " failures in a short time period, will not complain anymore ");
+              serversWarnedAbout.add(ctsc.getCacheKey());
+            }
+            
+            cachedConnection.setReserved(false);
+            
+          } else {
+            
+            if (log.isTraceEnabled())
+              log.trace("Returned connection " + ctsc.getCacheKey() + " ioCount : " + cachedConnection.transport.ioCount);
+            
+            cachedConnection.lastReturnTime = System.currentTimeMillis();
+            cachedConnection.setReserved(false);
+          }
+          existInCache = true;
+          break;
+        }
+      }
+      
+      // remove all unreserved cached connection when a sever has an error, not just the connection that was returned
+      if (ctsc.sawError) {
+        for (Iterator<CachedConnection> iterator = ccl.iterator(); iterator.hasNext();) {
+          CachedConnection cachedConnection = iterator.next();
+          if (!cachedConnection.isReserved()) {
+            closeList.add(cachedConnection);
+            iterator.remove();
+          }
+        }
+      }
+    }
+    
+    // close outside of sync block
+    for (CachedConnection cachedConnection : closeList) {
+      try {
+        cachedConnection.transport.close();
+      } catch (Exception e) {
+        log.debug("Failed to close connection w/ errors", e);
+      }
+    }
+    
+    if (!existInCache) {
+      log.warn("Returned tablet server connection to cache that did not come from cache");
+      // close outside of sync block
+      tsc.close();
+    }
+  }
+  
+  /**
+   * Set the time after which idle connections should be closed
+   * 
+   * @param time
+   */
+  public synchronized void setIdleTime(long time) {
+    this.killTime = time;
+    log.debug("Set thrift transport pool idle time to " + time);
+  }
+
+  private static ThriftTransportPool instance = new ThriftTransportPool();
+  private static final AtomicBoolean daemonStarted = new AtomicBoolean(false);
+  
+  public static ThriftTransportPool getInstance() {
+    SecurityManager sm = System.getSecurityManager();
+    if (sm != null) {
+      sm.checkPermission(TRANSPORT_POOL_PERMISSION);
+    }
+    
+    if (daemonStarted.compareAndSet(false, true)) {
+      new Daemon(new Closer(instance), "Thrift Connection Pool Checker").start();
+    }
+    return instance;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/client/impl/Translator.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/client/impl/Translator.java b/1.5/core/src/main/java/org/apache/accumulo/core/client/impl/Translator.java
new file mode 100644
index 0000000..aa5a3be
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/client/impl/Translator.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.impl;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.data.Column;
+import org.apache.accumulo.core.data.ConstraintViolationSummary;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.thrift.TColumn;
+import org.apache.accumulo.core.data.thrift.TConstraintViolationSummary;
+import org.apache.accumulo.core.data.thrift.TKeyExtent;
+import org.apache.accumulo.core.data.thrift.TRange;
+
+public abstract class Translator<IT,OT> {
+  
+  public abstract OT translate(IT input);
+  
+  public static class TKeyExtentTranslator extends Translator<TKeyExtent,KeyExtent> {
+    @Override
+    public KeyExtent translate(TKeyExtent input) {
+      return new KeyExtent(input);
+    }
+    
+  }
+  
+  public static class KeyExtentTranslator extends Translator<KeyExtent,TKeyExtent> {
+    @Override
+    public TKeyExtent translate(KeyExtent input) {
+      return input.toThrift();
+    }
+  }
+  
+  public static class TCVSTranslator extends Translator<TConstraintViolationSummary,org.apache.accumulo.core.data.ConstraintViolationSummary> {
+    @Override
+    public ConstraintViolationSummary translate(TConstraintViolationSummary input) {
+      return new ConstraintViolationSummary(input);
+    }
+  }
+  
+  public static class CVSTranslator extends Translator<org.apache.accumulo.core.data.ConstraintViolationSummary,TConstraintViolationSummary> {
+    @Override
+    public TConstraintViolationSummary translate(ConstraintViolationSummary input) {
+      return input.toThrift();
+    }
+  }
+  
+  public static class ColumnTranslator extends Translator<Column,TColumn> {
+    @Override
+    public TColumn translate(Column input) {
+      return input.toThrift();
+    }
+  }
+  
+  public static class TRangeTranslator extends Translator<TRange,Range> {
+    
+    @Override
+    public Range translate(TRange input) {
+      return new Range(input);
+    }
+    
+  }
+  
+  public static class RangeTranslator extends Translator<Range,TRange> {
+    @Override
+    public TRange translate(Range input) {
+      return input.toThrift();
+    }
+  }
+  
+  public static class ListTranslator<IT,OT> extends Translator<List<IT>,List<OT>> {
+    
+    private Translator<IT,OT> translator;
+    
+    public ListTranslator(Translator<IT,OT> translator) {
+      this.translator = translator;
+    }
+    
+    @Override
+    public List<OT> translate(List<IT> input) {
+      return translate(input, this.translator);
+    }
+    
+  }
+  
+  public static final TKeyExtentTranslator TKET = new TKeyExtentTranslator();
+  public static final TCVSTranslator TCVST = new TCVSTranslator();
+  public static final TRangeTranslator TRT = new TRangeTranslator();
+  
+  public static final KeyExtentTranslator KET = new KeyExtentTranslator();
+  public static final ColumnTranslator CT = new ColumnTranslator();
+  public static final Translator<Range,TRange> RT = new RangeTranslator();
+  public static final CVSTranslator CVST = new CVSTranslator();
+  
+  public static <IKT,OKT,T> Map<OKT,T> translate(Map<IKT,T> input, Translator<IKT,OKT> keyTranslator) {
+    HashMap<OKT,T> output = new HashMap<OKT,T>();
+    
+    for (Entry<IKT,T> entry : input.entrySet())
+      output.put(keyTranslator.translate(entry.getKey()), entry.getValue());
+    
+    return output;
+  }
+  
+  public static <IKT,OKT,IVT,OVT> Map<OKT,OVT> translate(Map<IKT,IVT> input, Translator<IKT,OKT> keyTranslator, Translator<IVT,OVT> valueTranslator) {
+    HashMap<OKT,OVT> output = new HashMap<OKT,OVT>();
+    
+    for (Entry<IKT,IVT> entry : input.entrySet())
+      output.put(keyTranslator.translate(entry.getKey()), valueTranslator.translate(entry.getValue()));
+    
+    return output;
+  }
+  
+  public static <IT,OT> List<OT> translate(Collection<IT> input, Translator<IT,OT> translator) {
+    ArrayList<OT> output = new ArrayList<OT>(input.size());
+    
+    for (IT in : input)
+      output.add(translator.translate(in));
+    
+    return output;
+  }
+}


Mime
View raw message