accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctubb...@apache.org
Subject [41/61] [abbrv] [partial] accumulo git commit: ACCUMULO-722 put trunk in my sandbox
Date Thu, 03 Mar 2016 22:00:06 GMT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocatorImpl.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocatorImpl.java b/1.5/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocatorImpl.java
new file mode 100644
index 0000000..ec8f95f
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/client/impl/TabletLocatorImpl.java
@@ -0,0 +1,609 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.impl;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.util.OpTimer;
+import org.apache.accumulo.core.util.TextUtil;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+public class TabletLocatorImpl extends TabletLocator {
+  
+  private static final Logger log = Logger.getLogger(TabletLocatorImpl.class);
+  
+  // there seems to be a bug in TreeMap.tailMap related to
+  // putting null in the treemap.. therefore instead of
+  // putting null, put MAX_TEXT
+  static final Text MAX_TEXT = new Text();
+  
+  private static class EndRowComparator implements Comparator<Text> {
+    
+    public int compare(Text o1, Text o2) {
+      
+      int ret;
+      
+      if (o1 == MAX_TEXT)
+        if (o2 == MAX_TEXT)
+          ret = 0;
+        else
+          ret = 1;
+      else if (o2 == MAX_TEXT)
+        ret = -1;
+      else
+        ret = o1.compareTo(o2);
+      
+      return ret;
+    }
+    
+  }
+  
+  static final EndRowComparator endRowComparator = new EndRowComparator();
+  
+  protected Text tableId;
+  protected TabletLocator parent;
+  protected TreeMap<Text,TabletLocation> metaCache = new TreeMap<Text,TabletLocation>(endRowComparator);
+  protected TabletLocationObtainer locationObtainer;
+  protected Text lastTabletRow;
+  
+  private TreeSet<KeyExtent> badExtents = new TreeSet<KeyExtent>();
+  private ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
+  private Lock rLock = rwLock.readLock();
+  private Lock wLock = rwLock.writeLock();
+  
+  public static interface TabletLocationObtainer {
+    List<TabletLocation> lookupTablet(TabletLocation src, Text row, Text stopRow, TabletLocator parent) throws AccumuloSecurityException, AccumuloException;
+    
+    List<TabletLocation> lookupTablets(String tserver, Map<KeyExtent,List<Range>> map, TabletLocator parent) throws AccumuloSecurityException,
+        AccumuloException;
+  }
+  
+  public TabletLocatorImpl(Text table, TabletLocator parent, TabletLocationObtainer tlo) {
+    this.tableId = table;
+    this.parent = parent;
+    this.locationObtainer = tlo;
+    
+    this.lastTabletRow = new Text(tableId);
+    lastTabletRow.append(new byte[] {'<'}, 0, 1);
+  }
+  
+  @Override
+  public void binMutations(List<Mutation> mutations, Map<String,TabletServerMutations> binnedMutations, List<Mutation> failures) throws AccumuloException,
+      AccumuloSecurityException, TableNotFoundException {
+    
+    OpTimer opTimer = null;
+    if (log.isTraceEnabled())
+      opTimer = new OpTimer(log, Level.TRACE).start("Binning " + mutations.size() + " mutations for table " + tableId);
+    
+    ArrayList<Mutation> notInCache = new ArrayList<Mutation>();
+    Text row = new Text();
+    
+    rLock.lock();
+    try {
+      processInvalidated();
+      
+      // for this to be efficient rows need to be in sorted order, but always sorting is slow... therefore only sort the
+      // stuff not in the cache.... it is most efficient to pass _locateTablet rows in sorted order
+      
+      // For this to be efficient, need to avoid fine grained synchronization and fine grained logging.
+      // Therefore methods called by this are not synchronized and should not log.
+      
+      for (Mutation mutation : mutations) {
+        row.set(mutation.getRow());
+        TabletLocation tl = locateTabletInCache(row);
+        if (tl == null)
+          notInCache.add(mutation);
+        else
+          addMutation(binnedMutations, mutation, tl);
+        
+      }
+    } finally {
+      rLock.unlock();
+    }
+    
+    if (notInCache.size() > 0) {
+      Collections.sort(notInCache, new Comparator<Mutation>() {
+        public int compare(Mutation o1, Mutation o2) {
+          return WritableComparator.compareBytes(o1.getRow(), 0, o1.getRow().length, o2.getRow(), 0, o2.getRow().length);
+        }
+      });
+      
+      wLock.lock();
+      try {
+        boolean failed = false;
+        for (Mutation mutation : notInCache) {
+          if (failed) {
+            // when one table does not return a location, something is probably
+            // screwy, go ahead and fail everything.
+            failures.add(mutation);
+            continue;
+          }
+          
+          row.set(mutation.getRow());
+          
+          TabletLocation tl = _locateTablet(row, false, false, false);
+          
+          if (tl == null) {
+            failures.add(mutation);
+            failed = true;
+          } else {
+            addMutation(binnedMutations, mutation, tl);
+          }
+        }
+      } finally {
+        wLock.unlock();
+      }
+    }
+    
+    if (opTimer != null)
+      opTimer.stop("Binned " + mutations.size() + " mutations for table " + tableId + " to " + binnedMutations.size() + " tservers in %DURATION%");
+  }
+  
+  private void addMutation(Map<String,TabletServerMutations> binnedMutations, Mutation mutation, TabletLocation tl) {
+    TabletServerMutations tsm = binnedMutations.get(tl.tablet_location);
+    
+    if (tsm == null) {
+      tsm = new TabletServerMutations();
+      binnedMutations.put(tl.tablet_location, tsm);
+    }
+    
+    tsm.addMutation(tl.tablet_extent, mutation);
+  }
+  
+  private List<Range> binRanges(List<Range> ranges, Map<String,Map<KeyExtent,List<Range>>> binnedRanges, boolean useCache) throws AccumuloException,
+      AccumuloSecurityException, TableNotFoundException {
+    List<Range> failures = new ArrayList<Range>();
+    List<TabletLocation> tabletLocations = new ArrayList<TabletLocation>();
+    
+    boolean lookupFailed = false;
+    
+    l1: for (Range range : ranges) {
+      tabletLocations.clear();
+      
+      Text startRow;
+      
+      if (range.getStartKey() != null) {
+        startRow = range.getStartKey().getRow();
+      } else
+        startRow = new Text();
+      
+      TabletLocation tl = null;
+      
+      if (useCache)
+        tl = locateTabletInCache(startRow);
+      else if (!lookupFailed)
+        tl = _locateTablet(startRow, false, false, false);
+      
+      if (tl == null) {
+        failures.add(range);
+        if (!useCache)
+          lookupFailed = true;
+        continue;
+      }
+      
+      tabletLocations.add(tl);
+      
+      while (tl.tablet_extent.getEndRow() != null && !range.afterEndKey(new Key(tl.tablet_extent.getEndRow()).followingKey(PartialKey.ROW))) {
+        if (useCache) {
+          Text row = new Text(tl.tablet_extent.getEndRow());
+          row.append(new byte[] {0}, 0, 1);
+          tl = locateTabletInCache(row);
+        } else {
+          tl = _locateTablet(tl.tablet_extent.getEndRow(), true, false, false);
+        }
+        
+        if (tl == null) {
+          failures.add(range);
+          if (!useCache)
+            lookupFailed = true;
+          continue l1;
+        }
+        tabletLocations.add(tl);
+      }
+      
+      for (TabletLocation tl2 : tabletLocations) {
+        TabletLocatorImpl.addRange(binnedRanges, tl2.tablet_location, tl2.tablet_extent, range);
+      }
+      
+    }
+    
+    return failures;
+  }
+  
+  @Override
+  public List<Range> binRanges(List<Range> ranges, Map<String,Map<KeyExtent,List<Range>>> binnedRanges) throws AccumuloException, AccumuloSecurityException,
+      TableNotFoundException {
+    
+    /*
+     * For this to be efficient, need to avoid fine grained synchronization and fine grained logging. Therefore methods called by this are not synchronized and
+     * should not log.
+     */
+    
+    OpTimer opTimer = null;
+    if (log.isTraceEnabled())
+      opTimer = new OpTimer(log, Level.TRACE).start("Binning " + ranges.size() + " ranges for table " + tableId);
+    
+    List<Range> failures;
+    rLock.lock();
+    try {
+      processInvalidated();
+      
+      // for this to be optimal, need to look ranges up in sorted order when
+      // ranges are not present in cache... however do not want to always
+      // sort ranges... therefore try binning ranges using only the cache
+      // and sort whatever fails and retry
+      
+      failures = binRanges(ranges, binnedRanges, true);
+    } finally {
+      rLock.unlock();
+    }
+    
+    if (failures.size() > 0) {
+      // sort failures by range start key
+      Collections.sort(failures);
+      
+      // try lookups again
+      wLock.lock();
+      try {
+        failures = binRanges(failures, binnedRanges, false);
+      } finally {
+        wLock.unlock();
+      }
+    }
+    
+    if (opTimer != null)
+      opTimer.stop("Binned " + ranges.size() + " ranges for table " + tableId + " to " + binnedRanges.size() + " tservers in %DURATION%");
+    
+    return failures;
+  }
+  
+  @Override
+  public void invalidateCache(KeyExtent failedExtent) {
+    wLock.lock();
+    try {
+      badExtents.add(failedExtent);
+    } finally {
+      wLock.unlock();
+    }
+    if (log.isTraceEnabled())
+      log.trace("Invalidated extent=" + failedExtent);
+  }
+  
+  @Override
+  public void invalidateCache(Collection<KeyExtent> keySet) {
+    wLock.lock();
+    try {
+      badExtents.addAll(keySet);
+    } finally {
+      wLock.unlock();
+    }
+    if (log.isTraceEnabled())
+      log.trace("Invalidated " + keySet.size() + " cache entries for table " + tableId);
+  }
+  
+  @Override
+  public void invalidateCache(String server) {
+    int invalidatedCount = 0;
+    
+    wLock.lock();
+    try {
+      for (TabletLocation cacheEntry : metaCache.values())
+        if (cacheEntry.tablet_location.equals(server)) {
+          badExtents.add(cacheEntry.tablet_extent);
+          invalidatedCount++;
+        }
+    } finally {
+      wLock.unlock();
+    }
+    
+    if (log.isTraceEnabled())
+      log.trace("invalidated " + invalidatedCount + " cache entries  table=" + tableId + " server=" + server);
+    
+  }
+  
+  @Override
+  public void invalidateCache() {
+    int invalidatedCount;
+    wLock.lock();
+    try {
+      invalidatedCount = metaCache.size();
+      metaCache.clear();
+    } finally {
+      wLock.unlock();
+    }
+    if (log.isTraceEnabled())
+      log.trace("invalidated all " + invalidatedCount + " cache entries for table=" + tableId);
+  }
+  
+  @Override
+  public TabletLocation locateTablet(Text row, boolean skipRow, boolean retry) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
+    
+    OpTimer opTimer = null;
+    if (log.isTraceEnabled())
+      opTimer = new OpTimer(log, Level.TRACE).start("Locating tablet  table=" + tableId + " row=" + TextUtil.truncate(row) + "  skipRow=" + skipRow + " retry="
+          + retry);
+    
+    while (true) {
+      
+      TabletLocation tl;
+      
+      tl = _locateTablet(row, skipRow, retry, true);
+      
+      if (retry && tl == null) {
+        UtilWaitThread.sleep(100);
+        if (log.isTraceEnabled())
+          log.trace("Failed to locate tablet containing row " + TextUtil.truncate(row) + " in table " + tableId + ", will retry...");
+        continue;
+      }
+      
+      if (opTimer != null)
+        opTimer.stop("Located tablet " + (tl == null ? null : tl.tablet_extent) + " at " + (tl == null ? null : tl.tablet_location) + " in %DURATION%");
+      
+      return tl;
+    }
+  }
+  
+  private void lookupTabletLocation(Text row, boolean retry) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
+    Text metadataRow = new Text(tableId);
+    metadataRow.append(new byte[] {';'}, 0, 1);
+    metadataRow.append(row.getBytes(), 0, row.getLength());
+    TabletLocation ptl = parent.locateTablet(metadataRow, false, retry);
+    
+    if (ptl != null) {
+      List<TabletLocation> locations = locationObtainer.lookupTablet(ptl, metadataRow, lastTabletRow, parent);
+      if (locations.size() == 0 && !ptl.tablet_extent.isRootTablet()) {
+        // try the next tablet
+        Text er = ptl.tablet_extent.getEndRow();
+        if (er != null && er.compareTo(lastTabletRow) < 0) {
+          // System.out.println("er "+er+"  ltr "+lastTabletRow);
+          ptl = parent.locateTablet(er, true, retry);
+          if (ptl != null)
+            locations = locationObtainer.lookupTablet(ptl, metadataRow, lastTabletRow, parent);
+        }
+      }
+      
+      // cannot assume the list contains contiguous key extents... so it is probably
+      // best to deal with each extent individually
+      
+      Text lastEndRow = null;
+      for (TabletLocation tabletLocation : locations) {
+        
+        KeyExtent ke = tabletLocation.tablet_extent;
+        TabletLocation locToCache;
+        
+        // create new location if current prevEndRow == endRow
+        if ((lastEndRow != null) && (ke.getPrevEndRow() != null) && ke.getPrevEndRow().equals(lastEndRow)) {
+          locToCache = new TabletLocation(new KeyExtent(ke.getTableId(), ke.getEndRow(), lastEndRow), tabletLocation.tablet_location);
+        } else {
+          locToCache = tabletLocation;
+        }
+        
+        // save endRow for next iteration
+        lastEndRow = locToCache.tablet_extent.getEndRow();
+        
+        updateCache(locToCache);
+      }
+    }
+    
+  }
+  
+  private void updateCache(TabletLocation tabletLocation) {
+    if (!tabletLocation.tablet_extent.getTableId().equals(tableId)) {
+      // sanity check
+      throw new IllegalStateException("Unexpected extent returned " + tableId + "  " + tabletLocation.tablet_extent);
+    }
+    
+    if (tabletLocation.tablet_location == null) {
+      // sanity check
+      throw new IllegalStateException("Cannot add null locations to cache " + tableId + "  " + tabletLocation.tablet_extent);
+    }
+    
+    if (!tabletLocation.tablet_extent.getTableId().equals(tableId)) {
+      // sanity check
+      throw new IllegalStateException("Cannot add other table ids to locations cache " + tableId + "  " + tabletLocation.tablet_extent);
+    }
+    
+    // clear out any overlapping extents in cache
+    removeOverlapping(metaCache, tabletLocation.tablet_extent);
+    
+    // add it to cache
+    Text er = tabletLocation.tablet_extent.getEndRow();
+    if (er == null)
+      er = MAX_TEXT;
+    metaCache.put(er, tabletLocation);
+    
+    if (badExtents.size() > 0)
+      removeOverlapping(badExtents, tabletLocation.tablet_extent);
+  }
+  
+  static void removeOverlapping(TreeMap<Text,TabletLocation> metaCache, KeyExtent nke) {
+    Iterator<Entry<Text,TabletLocation>> iter = null;
+    
+    if (nke.getPrevEndRow() == null) {
+      iter = metaCache.entrySet().iterator();
+    } else {
+      Text row = rowAfterPrevRow(nke);
+      SortedMap<Text,TabletLocation> tailMap = metaCache.tailMap(row);
+      iter = tailMap.entrySet().iterator();
+    }
+    
+    while (iter.hasNext()) {
+      Entry<Text,TabletLocation> entry = iter.next();
+      
+      KeyExtent ke = entry.getValue().tablet_extent;
+      
+      if (stopRemoving(nke, ke)) {
+        break;
+      }
+      
+      iter.remove();
+    }
+  }
+  
+  private static boolean stopRemoving(KeyExtent nke, KeyExtent ke) {
+    return ke.getPrevEndRow() != null && nke.getEndRow() != null && ke.getPrevEndRow().compareTo(nke.getEndRow()) >= 0;
+  }
+  
+  private static Text rowAfterPrevRow(KeyExtent nke) {
+    Text row = new Text(nke.getPrevEndRow());
+    row.append(new byte[] {0}, 0, 1);
+    return row;
+  }
+  
+  static void removeOverlapping(TreeSet<KeyExtent> extents, KeyExtent nke) {
+    for (KeyExtent overlapping : KeyExtent.findOverlapping(nke, extents)) {
+      extents.remove(overlapping);
+    }
+  }
+  
+  private TabletLocation locateTabletInCache(Text row) {
+    
+    Entry<Text,TabletLocation> entry = metaCache.ceilingEntry(row);
+    
+    if (entry != null) {
+      KeyExtent ke = entry.getValue().tablet_extent;
+      if (ke.getPrevEndRow() == null || ke.getPrevEndRow().compareTo(row) < 0)
+        return entry.getValue();
+    }
+    return null;
+  }
+  
+  protected TabletLocation _locateTablet(Text row, boolean skipRow, boolean retry, boolean lock) throws AccumuloException, AccumuloSecurityException,
+      TableNotFoundException {
+    
+    if (skipRow) {
+      row = new Text(row);
+      row.append(new byte[] {0}, 0, 1);
+    }
+    
+    TabletLocation tl;
+    
+    if (lock)
+      rLock.lock();
+    try {
+      processInvalidated();
+      tl = locateTabletInCache(row);
+    } finally {
+      if (lock)
+        rLock.unlock();
+    }
+    
+    if (tl == null) {
+      if (lock)
+        wLock.lock();
+      try {
+        // not in cache, so obtain info
+        lookupTabletLocation(row, retry);
+        
+        tl = locateTabletInCache(row);
+      } finally {
+        if (lock)
+          wLock.unlock();
+      }
+    }
+    
+    return tl;
+  }
+  
+  private void processInvalidated() throws AccumuloSecurityException, AccumuloException, TableNotFoundException {
+    
+    if (badExtents.size() == 0)
+      return;
+    
+    boolean writeLockHeld = rwLock.isWriteLockedByCurrentThread();
+    try {
+      if (!writeLockHeld) {
+        rLock.unlock();
+        wLock.lock();
+        if (badExtents.size() == 0)
+          return;
+      }
+      
+      List<Range> lookups = new ArrayList<Range>(badExtents.size());
+      
+      for (KeyExtent be : badExtents) {
+        lookups.add(be.toMetadataRange());
+        removeOverlapping(metaCache, be);
+      }
+      
+      lookups = Range.mergeOverlapping(lookups);
+      
+      Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
+      
+      parent.binRanges(lookups, binnedRanges);
+      
+      // randomize server order
+      ArrayList<String> tabletServers = new ArrayList<String>(binnedRanges.keySet());
+      Collections.shuffle(tabletServers);
+      
+      for (String tserver : tabletServers) {
+        List<TabletLocation> locations = locationObtainer.lookupTablets(tserver, binnedRanges.get(tserver), parent);
+        
+        for (TabletLocation tabletLocation : locations) {
+          updateCache(tabletLocation);
+        }
+      }
+    } finally {
+      if (!writeLockHeld) {
+        rLock.lock();
+        wLock.unlock();
+      }
+    }
+  }
+  
+  protected static void addRange(Map<String,Map<KeyExtent,List<Range>>> binnedRanges, String location, KeyExtent ke, Range range) {
+    Map<KeyExtent,List<Range>> tablets = binnedRanges.get(location);
+    if (tablets == null) {
+      tablets = new HashMap<KeyExtent,List<Range>>();
+      binnedRanges.put(location, tablets);
+    }
+    
+    List<Range> tabletsRanges = tablets.get(ke);
+    if (tabletsRanges == null) {
+      tabletsRanges = new ArrayList<Range>();
+      tablets.put(ke, tabletsRanges);
+    }
+    
+    tabletsRanges.add(range);
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchDeleter.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchDeleter.java b/1.5/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchDeleter.java
new file mode 100644
index 0000000..be4318e
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchDeleter.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.impl;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.BatchDeleter;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.SortedKeyIterator;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.core.security.thrift.AuthInfo;
+
+public class TabletServerBatchDeleter extends TabletServerBatchReader implements BatchDeleter {
+  
+  private Instance instance;
+  private AuthInfo credentials;
+  private String tableId;
+  private long maxMemory;
+  private long maxLatency;
+  private int maxWriteThreads;
+  
+  public TabletServerBatchDeleter(Instance instance, AuthInfo credentials, String tableId, Authorizations authorizations, int numQueryThreads, long maxMemory,
+      long maxLatency, int maxWriteThreads) throws TableNotFoundException {
+    super(instance, credentials, tableId, authorizations, numQueryThreads);
+    this.instance = instance;
+    this.credentials = credentials;
+    this.tableId = tableId;
+    this.maxMemory = maxMemory;
+    this.maxLatency = maxLatency;
+    this.maxWriteThreads = maxWriteThreads;
+    super.addScanIterator(new IteratorSetting(Integer.MAX_VALUE, BatchDeleter.class.getName() + ".NOVALUE", SortedKeyIterator.class));
+  }
+  
+  @Override
+  public void delete() throws MutationsRejectedException, TableNotFoundException {
+    BatchWriter bw = null;
+    try {
+      bw = new BatchWriterImpl(instance, credentials, tableId, maxMemory, maxLatency, maxWriteThreads);
+      Iterator<Entry<Key,Value>> iter = super.iterator();
+      while (iter.hasNext()) {
+        Entry<Key,Value> next = iter.next();
+        Key k = next.getKey();
+        Mutation m = new Mutation(k.getRow());
+        m.putDelete(k.getColumnFamily(), k.getColumnQualifier(), new ColumnVisibility(k.getColumnVisibility()), k.getTimestamp());
+        bw.addMutation(m);
+      }
+    } finally {
+      if (bw != null)
+        bw.close();
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReader.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReader.java b/1.5/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReader.java
new file mode 100644
index 0000000..c0baade
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReader.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.impl;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.thrift.AuthInfo;
+import org.apache.accumulo.core.util.ArgumentChecker;
+import org.apache.accumulo.core.util.SimpleThreadPool;
+import org.apache.log4j.Logger;
+
+public class TabletServerBatchReader extends ScannerOptions implements BatchScanner {
+  public static final Logger log = Logger.getLogger(TabletServerBatchReader.class);
+  
+  private String table;
+  private int numThreads;
+  private ExecutorService queryThreadPool;
+  
+  private Instance instance;
+  private ArrayList<Range> ranges;
+  
+  private AuthInfo credentials;
+  private Authorizations authorizations = Constants.NO_AUTHS;
+  
+  private static int nextBatchReaderInstance = 1;
+  
+  private static synchronized int getNextBatchReaderInstance() {
+    return nextBatchReaderInstance++;
+  }
+  
+  private final int batchReaderInstance = getNextBatchReaderInstance();
+  
+  public TabletServerBatchReader(Instance instance, AuthInfo credentials, String table, Authorizations authorizations, int numQueryThreads) {
+    ArgumentChecker.notNull(instance, credentials, table, authorizations);
+    this.instance = instance;
+    this.credentials = credentials;
+    this.authorizations = authorizations;
+    this.table = table;
+    this.numThreads = numQueryThreads;
+    
+    queryThreadPool = new SimpleThreadPool(numQueryThreads, "batch scanner " + batchReaderInstance + "-");
+    
+    ranges = null;
+  }
+  
+  public void close() {
+    queryThreadPool.shutdownNow();
+  }
+  
+  /**
+   * Warning: do not rely upon finalize to close this class. Finalize is not guaranteed to be called.
+   */
+  @Override
+  protected void finalize() {
+    if (!queryThreadPool.isShutdown()) {
+      log.warn(TabletServerBatchReader.class.getSimpleName() + " not shutdown; did you forget to call close()?");
+      close();
+    }
+  }
+  
+  @Override
+  public void setRanges(Collection<Range> ranges) {
+    if (ranges == null || ranges.size() == 0) {
+      throw new IllegalArgumentException("ranges must be non null and contain at least 1 range");
+    }
+    
+    if (queryThreadPool.isShutdown()) {
+      throw new IllegalStateException("batch reader closed");
+    }
+    
+    this.ranges = new ArrayList<Range>(ranges);
+    
+  }
+  
+  @Override
+  public Iterator<Entry<Key,Value>> iterator() {
+    if (ranges == null) {
+      throw new IllegalStateException("ranges not set");
+    }
+    
+    if (queryThreadPool.isShutdown()) {
+      throw new IllegalStateException("batch reader closed");
+    }
+    
+    return new TabletServerBatchReaderIterator(instance, credentials, table, authorizations, ranges, numThreads, queryThreadPool, this, timeOut * 1000l);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java b/1.5/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
new file mode 100644
index 0000000..7e321f7
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
@@ -0,0 +1,728 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.cloudtrace.instrument.TraceRunnable;
+import org.apache.accumulo.cloudtrace.instrument.Tracer;
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.TableDeletedException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.TableOfflineException;
+import org.apache.accumulo.core.client.TimedOutException;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.data.Column;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.data.thrift.InitialMultiScan;
+import org.apache.accumulo.core.data.thrift.MultiScanResult;
+import org.apache.accumulo.core.data.thrift.TKeyExtent;
+import org.apache.accumulo.core.data.thrift.TKeyValue;
+import org.apache.accumulo.core.data.thrift.TRange;
+import org.apache.accumulo.core.master.state.tables.TableState;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.thrift.AuthInfo;
+import org.apache.accumulo.core.security.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
+import org.apache.accumulo.core.util.ByteBufferUtil;
+import org.apache.accumulo.core.util.OpTimer;
+import org.apache.accumulo.core.util.ThriftUtil;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TApplicationException;
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+
+public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value>> {
+  
+  private static final Logger log = Logger.getLogger(TabletServerBatchReaderIterator.class);
+  
+  private final Instance instance;
+  private final AuthInfo credentials;
+  private final String table;
+  private Authorizations authorizations = Constants.NO_AUTHS;
+  private final int numThreads;
+  private final ExecutorService queryThreadPool;
+  private final ScannerOptions options;
+  
+  private ArrayBlockingQueue<List<Entry<Key,Value>>> resultsQueue;
+  private Iterator<Entry<Key,Value>> batchIterator;
+  private List<Entry<Key,Value>> batch;
+  private static final List<Entry<Key,Value>> LAST_BATCH = new ArrayList<Map.Entry<Key,Value>>();
+  private Object nextLock = new Object();
+  
+  private long failSleepTime = 100;
+  
+  private volatile Throwable fatalException = null;
+  
+  private Map<String,TimeoutTracker> timeoutTrackers;
+  private Set<String> timedoutServers;
+  private long timeout;
+
+  public interface ResultReceiver {
+    void receive(List<Entry<Key,Value>> entries);
+  }
+  
+  private static class MyEntry implements Entry<Key,Value> {
+    
+    private Key key;
+    private Value value;
+    
+    MyEntry(Key key, Value value) {
+      this.key = key;
+      this.value = value;
+    }
+    
+    @Override
+    public Key getKey() {
+      return key;
+    }
+    
+    @Override
+    public Value getValue() {
+      return value;
+    }
+    
+    @Override
+    public Value setValue(Value value) {
+      throw new UnsupportedOperationException();
+    }
+    
+  }
+  
+  public TabletServerBatchReaderIterator(Instance instance, AuthInfo credentials, String table, Authorizations authorizations, ArrayList<Range> ranges,
+      int numThreads, ExecutorService queryThreadPool, ScannerOptions scannerOptions, long timeout) {
+    
+    this.instance = instance;
+    this.credentials = credentials;
+    this.table = table;
+    this.authorizations = authorizations;
+    this.numThreads = numThreads;
+    this.queryThreadPool = queryThreadPool;
+    this.options = new ScannerOptions(scannerOptions);
+    resultsQueue = new ArrayBlockingQueue<List<Entry<Key,Value>>>(numThreads);
+    
+    timeoutTrackers = Collections.synchronizedMap(new HashMap<String,TabletServerBatchReaderIterator.TimeoutTracker>());
+    timedoutServers = Collections.synchronizedSet(new HashSet<String>());
+    this.timeout = timeout;
+
+    if (options.fetchedColumns.size() > 0) {
+      ArrayList<Range> ranges2 = new ArrayList<Range>(ranges.size());
+      for (Range range : ranges) {
+        ranges2.add(range.bound(options.fetchedColumns.first(), options.fetchedColumns.last()));
+      }
+      
+      ranges = ranges2;
+    }
+    
+    ResultReceiver rr = new ResultReceiver() {
+      
+      @Override
+      public void receive(List<Entry<Key,Value>> entries) {
+        try {
+          resultsQueue.put(entries);
+        } catch (InterruptedException e) {
+          if (TabletServerBatchReaderIterator.this.queryThreadPool.isShutdown())
+            log.debug("Failed to add Batch Scan result", e);
+          else
+            log.warn("Failed to add Batch Scan result", e);
+          fatalException = e;
+          throw new RuntimeException(e);
+          
+        }
+      }
+      
+    };
+    
+    try {
+      lookup(ranges, rr);
+    } catch (RuntimeException re) {
+      throw re;
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to create iterator", e);
+    }
+  }
+  
+
+  @Override
+  public boolean hasNext() {
+    synchronized (nextLock) {
+      if (batch == LAST_BATCH)
+        return false;
+      
+      if (batch != null && batchIterator.hasNext())
+        return true;
+      
+      // don't have one cached, try to cache one and return success
+      try {
+        batch = null;
+        while (batch == null && fatalException == null && !queryThreadPool.isShutdown())
+          batch = resultsQueue.poll(1, TimeUnit.SECONDS);
+        
+        if (fatalException != null)
+          if (fatalException instanceof RuntimeException)
+            throw (RuntimeException) fatalException;
+          else
+            throw new RuntimeException(fatalException);
+        
+        if (queryThreadPool.isShutdown())
+          throw new RuntimeException("scanner closed");
+
+        batchIterator = batch.iterator();
+        return batch != LAST_BATCH;
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+  
+  @Override
+  public Entry<Key,Value> next() {
+    // if there's one waiting, or hasNext() can get one, return it
+    synchronized (nextLock) {
+      if (hasNext())
+        return batchIterator.next();
+      else
+        throw new NoSuchElementException();
+    }
+  }
+  
+  @Override
+  public void remove() {
+    throw new UnsupportedOperationException();
+  }
+  
+  private synchronized void lookup(List<Range> ranges, ResultReceiver receiver) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
+    List<Column> columns = new ArrayList<Column>(options.fetchedColumns);
+    ranges = Range.mergeOverlapping(ranges);
+    
+    Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
+    
+    binRanges(TabletLocator.getInstance(instance, credentials, new Text(table)), ranges, binnedRanges);
+    
+    doLookups(binnedRanges, receiver, columns);
+  }
+  
+  private void binRanges(TabletLocator tabletLocator, List<Range> ranges, Map<String,Map<KeyExtent,List<Range>>> binnedRanges) throws AccumuloException,
+      AccumuloSecurityException, TableNotFoundException {
+    
+    int lastFailureSize = Integer.MAX_VALUE;
+    
+    while (true) {
+      
+      binnedRanges.clear();
+      List<Range> failures = tabletLocator.binRanges(ranges, binnedRanges);
+      
+      if (failures.size() > 0) {
+        // tried to only do table state checks when failures.size() == ranges.size(), however this did
+        // not work because nothing ever invalidated entries in the tabletLocator cache... so even though
+        // the table was deleted the tablet locator entries for the deleted table were not cleared... so
+        // need to always do the check when failures occur
+        if (failures.size() >= lastFailureSize)
+          if (!Tables.exists(instance, table))
+            throw new TableDeletedException(table);
+          else if (Tables.getTableState(instance, table) == TableState.OFFLINE)
+            throw new TableOfflineException(instance, table);
+        
+        lastFailureSize = failures.size();
+        
+        if (log.isTraceEnabled())
+          log.trace("Failed to bin " + failures.size() + " ranges, tablet locations were null, retrying in 100ms");
+        try {
+          Thread.sleep(100);
+        } catch (InterruptedException e) {
+          throw new RuntimeException(e);
+        }
+      } else {
+        break;
+      }
+      
+    }
+    
+    // truncate the ranges to within the tablets... this makes it easier to know what work
+    // needs to be redone when failures occurs and tablets have merged or split
+    Map<String,Map<KeyExtent,List<Range>>> binnedRanges2 = new HashMap<String,Map<KeyExtent,List<Range>>>();
+    for (Entry<String,Map<KeyExtent,List<Range>>> entry : binnedRanges.entrySet()) {
+      Map<KeyExtent,List<Range>> tabletMap = new HashMap<KeyExtent,List<Range>>();
+      binnedRanges2.put(entry.getKey(), tabletMap);
+      for (Entry<KeyExtent,List<Range>> tabletRanges : entry.getValue().entrySet()) {
+        Range tabletRange = tabletRanges.getKey().toDataRange();
+        List<Range> clippedRanges = new ArrayList<Range>();
+        tabletMap.put(tabletRanges.getKey(), clippedRanges);
+        for (Range range : tabletRanges.getValue())
+          clippedRanges.add(tabletRange.clip(range));
+      }
+    }
+    
+    binnedRanges.clear();
+    binnedRanges.putAll(binnedRanges2);
+  }
+  
+  private void processFailures(Map<KeyExtent,List<Range>> failures, ResultReceiver receiver, List<Column> columns) throws AccumuloException,
+      AccumuloSecurityException, TableNotFoundException {
+    if (log.isTraceEnabled())
+      log.trace("Failed to execute multiscans against " + failures.size() + " tablets, retrying...");
+    
+    UtilWaitThread.sleep(failSleepTime);
+    failSleepTime = Math.min(5000, failSleepTime * 2);
+    
+    Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
+    List<Range> allRanges = new ArrayList<Range>();
+    
+    for (List<Range> ranges : failures.values())
+      allRanges.addAll(ranges);
+    
+    TabletLocator tabletLocator = TabletLocator.getInstance(instance, credentials, new Text(table));
+    
+    // since the first call to binRanges clipped the ranges to within a tablet, we should not get only
+    // bin to the set of failed tablets
+    binRanges(tabletLocator, allRanges, binnedRanges);
+    
+    doLookups(binnedRanges, receiver, columns);
+  }
+  
+  private class QueryTask implements Runnable {
+    
+    private String tsLocation;
+    private Map<KeyExtent,List<Range>> tabletsRanges;
+    private ResultReceiver receiver;
+    private Semaphore semaphore = null;
+    private Map<KeyExtent,List<Range>> failures;
+    private List<Column> columns;
+    private int semaphoreSize;
+    
+    QueryTask(String tsLocation, Map<KeyExtent,List<Range>> tabletsRanges, Map<KeyExtent,List<Range>> failures, ResultReceiver receiver, List<Column> columns) {
+      this.tsLocation = tsLocation;
+      this.tabletsRanges = tabletsRanges;
+      this.receiver = receiver;
+      this.columns = columns;
+      this.failures = failures;
+    }
+    
+    void setSemaphore(Semaphore semaphore, int semaphoreSize) {
+      this.semaphore = semaphore;
+      this.semaphoreSize = semaphoreSize;
+    }
+    
+    public void run() {
+      String threadName = Thread.currentThread().getName();
+      Thread.currentThread().setName(threadName + " looking up " + tabletsRanges.size() + " ranges at " + tsLocation);
+      Map<KeyExtent,List<Range>> unscanned = new HashMap<KeyExtent,List<Range>>();
+      Map<KeyExtent,List<Range>> tsFailures = new HashMap<KeyExtent,List<Range>>();
+      try {
+        TimeoutTracker timeoutTracker = timeoutTrackers.get(tsLocation);
+        if (timeoutTracker == null) {
+          timeoutTracker = new TimeoutTracker(tsLocation, timedoutServers, timeout);
+          timeoutTrackers.put(tsLocation, timeoutTracker);
+        }
+        doLookup(tsLocation, tabletsRanges, tsFailures, unscanned, receiver, columns, credentials, options, authorizations, instance.getConfiguration(),
+            timeoutTracker);
+        if (tsFailures.size() > 0) {
+          TabletLocator tabletLocator = TabletLocator.getInstance(instance, credentials, new Text(table));
+          tabletLocator.invalidateCache(tsFailures.keySet());
+          synchronized (failures) {
+            failures.putAll(tsFailures);
+          }
+        }
+        
+      } catch (IOException e) {
+        synchronized (failures) {
+          failures.putAll(tsFailures);
+          failures.putAll(unscanned);
+        }
+        
+        TabletLocator.getInstance(instance, credentials, new Text(table)).invalidateCache(tsLocation);
+        log.debug(e.getMessage(), e);
+      } catch (AccumuloSecurityException e) {
+        log.debug(e.getMessage(), e);
+        
+        Tables.clearCache(instance);
+        if (!Tables.exists(instance, table))
+          fatalException = new TableDeletedException(table);
+        else
+          fatalException = e;
+      } catch (Throwable t) {
+        if (queryThreadPool.isShutdown())
+          log.debug(t.getMessage(), t);
+        else
+          log.warn(t.getMessage(), t);
+        fatalException = t;
+      } finally {
+        semaphore.release();
+        Thread.currentThread().setName(threadName);
+        if (semaphore.tryAcquire(semaphoreSize)) {
+          // finished processing all queries
+          if (fatalException == null && failures.size() > 0) {
+            // there were some failures
+            try {
+              processFailures(failures, receiver, columns);
+            } catch (TableNotFoundException e) {
+              log.debug(e.getMessage(), e);
+              fatalException = e;
+            } catch (AccumuloException e) {
+              log.debug(e.getMessage(), e);
+              fatalException = e;
+            } catch (AccumuloSecurityException e) {
+              log.debug(e.getMessage(), e);
+              fatalException = e;
+            } catch (Throwable t) {
+              log.debug(t.getMessage(), t);
+              fatalException = t;
+            }
+            
+            if (fatalException != null) {
+              // we are finished with this batch query
+              if (!resultsQueue.offer(LAST_BATCH)) {
+                log.debug("Could not add to result queue after seeing fatalException in processFailures", fatalException);
+              }
+            }
+          } else {
+            // we are finished with this batch query
+            if (fatalException != null) {
+              if (!resultsQueue.offer(LAST_BATCH)) {
+                log.debug("Could not add to result queue after seeing fatalException", fatalException);
+              }
+            } else {
+              try {
+                resultsQueue.put(LAST_BATCH);
+              } catch (InterruptedException e) {
+                fatalException = e;
+                if (!resultsQueue.offer(LAST_BATCH)) {
+                  log.debug("Could not add to result queue after seeing fatalException", fatalException);
+                }
+              }
+            }
+          }
+        }
+      }
+    }
+    
+  }
+  
+  private void doLookups(Map<String,Map<KeyExtent,List<Range>>> binnedRanges, final ResultReceiver receiver, List<Column> columns) {
+    
+    if (timedoutServers.containsAll(binnedRanges.keySet())) {
+      // all servers have timed out
+      throw new TimedOutException(timedoutServers);
+    }
+
+    // when there are lots of threads and a few tablet servers
+    // it is good to break request to tablet servers up, the
+    // following code determines if this is the case
+    int maxTabletsPerRequest = Integer.MAX_VALUE;
+    if (numThreads / binnedRanges.size() > 1) {
+      int totalNumberOfTablets = 0;
+      for (Entry<String,Map<KeyExtent,List<Range>>> entry : binnedRanges.entrySet()) {
+        totalNumberOfTablets += entry.getValue().size();
+      }
+      
+      maxTabletsPerRequest = totalNumberOfTablets / numThreads;
+      if (maxTabletsPerRequest == 0) {
+        maxTabletsPerRequest = 1;
+      }
+      
+    }
+    
+    Map<KeyExtent,List<Range>> failures = new HashMap<KeyExtent,List<Range>>();
+    
+    if (timedoutServers.size() > 0) {
+      // go ahead and fail any timed out servers
+      for (Iterator<Entry<String,Map<KeyExtent,List<Range>>>> iterator = binnedRanges.entrySet().iterator(); iterator.hasNext();) {
+        Entry<String,Map<KeyExtent,List<Range>>> entry = iterator.next();
+        if (timedoutServers.contains(entry.getKey())) {
+          failures.putAll(entry.getValue());
+          iterator.remove();
+        }
+      }
+    }
+
+    // randomize tabletserver order... this will help when there are multiple
+    // batch readers and writers running against accumulo
+    List<String> locations = new ArrayList<String>(binnedRanges.keySet());
+    Collections.shuffle(locations);
+    
+    List<QueryTask> queryTasks = new ArrayList<QueryTask>();
+    
+    for (final String tsLocation : locations) {
+      
+      final Map<KeyExtent,List<Range>> tabletsRanges = binnedRanges.get(tsLocation);
+      if (maxTabletsPerRequest == Integer.MAX_VALUE || tabletsRanges.size() == 1) {
+        QueryTask queryTask = new QueryTask(tsLocation, tabletsRanges, failures, receiver, columns);
+        queryTasks.add(queryTask);
+      } else {
+        HashMap<KeyExtent,List<Range>> tabletSubset = new HashMap<KeyExtent,List<Range>>();
+        for (Entry<KeyExtent,List<Range>> entry : tabletsRanges.entrySet()) {
+          tabletSubset.put(entry.getKey(), entry.getValue());
+          if (tabletSubset.size() >= maxTabletsPerRequest) {
+            QueryTask queryTask = new QueryTask(tsLocation, tabletSubset, failures, receiver, columns);
+            queryTasks.add(queryTask);
+            tabletSubset = new HashMap<KeyExtent,List<Range>>();
+          }
+        }
+        
+        if (tabletSubset.size() > 0) {
+          QueryTask queryTask = new QueryTask(tsLocation, tabletSubset, failures, receiver, columns);
+          queryTasks.add(queryTask);
+        }
+      }
+    }
+    
+    final Semaphore semaphore = new Semaphore(queryTasks.size());
+    semaphore.acquireUninterruptibly(queryTasks.size());
+    
+    for (QueryTask queryTask : queryTasks) {
+      queryTask.setSemaphore(semaphore, queryTasks.size());
+      queryThreadPool.execute(new TraceRunnable(queryTask));
+    }
+  }
+  
+  static void trackScanning(Map<KeyExtent,List<Range>> failures, Map<KeyExtent,List<Range>> unscanned, MultiScanResult scanResult) {
+    
+    // translate returned failures, remove them from unscanned, and add them to failures
+    Map<KeyExtent,List<Range>> retFailures = Translator.translate(scanResult.failures, Translator.TKET, new Translator.ListTranslator<TRange,Range>(
+        Translator.TRT));
+    unscanned.keySet().removeAll(retFailures.keySet());
+    failures.putAll(retFailures);
+    
+    // translate full scans and remove them from unscanned
+    HashSet<KeyExtent> fullScans = new HashSet<KeyExtent>(Translator.translate(scanResult.fullScans, Translator.TKET));
+    unscanned.keySet().removeAll(fullScans);
+    
+    // remove partial scan from unscanned
+    if (scanResult.partScan != null) {
+      KeyExtent ke = new KeyExtent(scanResult.partScan);
+      Key nextKey = new Key(scanResult.partNextKey);
+      
+      ListIterator<Range> iterator = unscanned.get(ke).listIterator();
+      while (iterator.hasNext()) {
+        Range range = iterator.next();
+        
+        if (range.afterEndKey(nextKey) || (nextKey.equals(range.getEndKey()) && scanResult.partNextKeyInclusive != range.isEndKeyInclusive())) {
+          iterator.remove();
+        } else if (range.contains(nextKey)) {
+          iterator.remove();
+          Range partRange = new Range(nextKey, scanResult.partNextKeyInclusive, range.getEndKey(), range.isEndKeyInclusive());
+          iterator.add(partRange);
+        }
+      }
+    }
+  }
+  
+  private static class TimeoutTracker {
+    
+    String server;
+    Set<String> badServers;
+    long timeOut;
+    long activityTime;
+    Long firstErrorTime = null;
+    
+    TimeoutTracker(String server, Set<String> badServers, long timeOut) {
+      this(timeOut);
+      this.server = server;
+      this.badServers = badServers;
+    }
+
+    TimeoutTracker(long timeOut) {
+      this.timeOut = timeOut;
+    }
+
+    void startingScan() {
+      activityTime = System.currentTimeMillis();
+    }
+    
+    void check() throws IOException {
+      if (System.currentTimeMillis() - activityTime > timeOut) {
+        badServers.add(server);
+        throw new IOException("Time exceeded " + (System.currentTimeMillis() - activityTime) + " " + server);
+      }
+    }
+    
+    void madeProgress() {
+      activityTime = System.currentTimeMillis();
+      firstErrorTime = null;
+    }
+    
+    void errorOccured(Exception e) {
+      if (firstErrorTime == null) {
+        firstErrorTime = activityTime;
+      } else if (System.currentTimeMillis() - firstErrorTime > timeOut) {
+        badServers.add(server);
+      }
+    }
+    
+    /**
+     * @return
+     */
+    public long getTimeOut() {
+      return timeOut;
+    }
+  }
+
+  static void doLookup(String server, Map<KeyExtent,List<Range>> requested, Map<KeyExtent,List<Range>> failures, Map<KeyExtent,List<Range>> unscanned,
+      ResultReceiver receiver, List<Column> columns, AuthInfo credentials, ScannerOptions options, Authorizations authorizations, AccumuloConfiguration conf)
+      throws IOException, AccumuloSecurityException, AccumuloServerException {
+    doLookup(server, requested, failures, unscanned, receiver, columns, credentials, options, authorizations, conf, new TimeoutTracker(Long.MAX_VALUE));
+  }
+  
+  static void doLookup(String server, Map<KeyExtent,List<Range>> requested, Map<KeyExtent,List<Range>> failures, Map<KeyExtent,List<Range>> unscanned,
+      ResultReceiver receiver, List<Column> columns, AuthInfo credentials, ScannerOptions options, Authorizations authorizations, AccumuloConfiguration conf,
+      TimeoutTracker timeoutTracker) throws IOException, AccumuloSecurityException, AccumuloServerException {
+    
+    if (requested.size() == 0) {
+      return;
+    }
+    
+    // copy requested to unscanned map. we will remove ranges as they are scanned in trackScanning()
+    for (Entry<KeyExtent,List<Range>> entry : requested.entrySet()) {
+      ArrayList<Range> ranges = new ArrayList<Range>();
+      for (Range range : entry.getValue()) {
+        ranges.add(new Range(range));
+      }
+      unscanned.put(new KeyExtent(entry.getKey()), ranges);
+    }
+    
+    timeoutTracker.startingScan();
+    TTransport transport = null;
+    try {
+      TabletClientService.Client client;
+      if (timeoutTracker.getTimeOut() < Integer.MAX_VALUE * 1000l)
+        client = ThriftUtil.getTServerClient(server, conf, timeoutTracker.getTimeOut());
+      else
+        client = ThriftUtil.getTServerClient(server, conf);
+
+      try {
+        
+
+
+        OpTimer opTimer = new OpTimer(log, Level.TRACE).start("Starting multi scan, tserver=" + server + "  #tablets=" + requested.size() + "  #ranges="
+            + sumSizes(requested.values()) + " ssil=" + options.serverSideIteratorList + " ssio=" + options.serverSideIteratorOptions);
+        
+        TabletType ttype = TabletType.type(requested.keySet());
+        boolean waitForWrites = !ThriftScanner.serversWaitedForWrites.get(ttype).contains(server);
+        
+        Map<TKeyExtent,List<TRange>> thriftTabletRanges = Translator.translate(requested, Translator.KET, new Translator.ListTranslator<Range,TRange>(
+            Translator.RT));
+        InitialMultiScan imsr = client.startMultiScan(Tracer.traceInfo(), credentials, thriftTabletRanges, Translator.translate(columns, Translator.CT),
+            options.serverSideIteratorList, options.serverSideIteratorOptions, ByteBufferUtil.toByteBuffers(authorizations.getAuthorizations()), waitForWrites);
+        if (waitForWrites)
+          ThriftScanner.serversWaitedForWrites.get(ttype).add(server);
+        
+        MultiScanResult scanResult = imsr.result;
+        
+        opTimer.stop("Got 1st multi scan results, #results=" + scanResult.results.size() + (scanResult.more ? "  scanID=" + imsr.scanID : "")
+            + " in %DURATION%");
+        
+        ArrayList<Entry<Key,Value>> entries = new ArrayList<Map.Entry<Key,Value>>(scanResult.results.size());
+        for (TKeyValue kv : scanResult.results) {
+          entries.add(new MyEntry(new Key(kv.key), new Value(kv.value)));
+        }
+        
+        if (entries.size() > 0)
+          receiver.receive(entries);
+
+        if (entries.size() > 0 || scanResult.fullScans.size() > 0)
+          timeoutTracker.madeProgress();
+
+        trackScanning(failures, unscanned, scanResult);
+        
+        while (scanResult.more) {
+          
+          timeoutTracker.check();
+
+          opTimer.start("Continuing multi scan, scanid=" + imsr.scanID);
+          scanResult = client.continueMultiScan(Tracer.traceInfo(), imsr.scanID);
+          opTimer.stop("Got more multi scan results, #results=" + scanResult.results.size() + (scanResult.more ? "  scanID=" + imsr.scanID : "")
+              + " in %DURATION%");
+          
+          entries = new ArrayList<Map.Entry<Key,Value>>(scanResult.results.size());
+          for (TKeyValue kv : scanResult.results) {
+            entries.add(new MyEntry(new Key(kv.key), new Value(kv.value)));
+          }
+          
+          if (entries.size() > 0)
+            receiver.receive(entries);
+          
+          if (entries.size() > 0 || scanResult.fullScans.size() > 0)
+            timeoutTracker.madeProgress();
+
+          trackScanning(failures, unscanned, scanResult);
+        }
+        
+        client.closeMultiScan(Tracer.traceInfo(), imsr.scanID);
+        
+      } finally {
+        ThriftUtil.returnClient(client);
+      }
+    } catch (TTransportException e) {
+      log.debug("Server : " + server + " msg : " + e.getMessage());
+      timeoutTracker.errorOccured(e);
+      throw new IOException(e);
+    } catch (ThriftSecurityException e) {
+      log.debug("Server : " + server + " msg : " + e.getMessage(), e);
+      throw new AccumuloSecurityException(e.user, e.code, e);
+    } catch (TApplicationException e) {
+      log.debug("Server : " + server + " msg : " + e.getMessage(), e);
+      throw new AccumuloServerException(server, e);
+    } catch (TException e) {
+      log.debug("Server : " + server + " msg : " + e.getMessage(), e);
+      timeoutTracker.errorOccured(e);
+      throw new IOException(e);
+    } catch (NoSuchScanIDException e) {
+      log.debug("Server : " + server + " msg : " + e.getMessage(), e);
+      throw new IOException(e);
+    } finally {
+      ThriftTransportPool.getInstance().returnTransport(transport);
+    }
+  }
+  
+  static int sumSizes(Collection<List<Range>> values) {
+    int sum = 0;
+    
+    for (List<Range> list : values) {
+      sum += list.size();
+    }
+    
+    return sum;
+  }
+}


Mime
View raw message