accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject [2/2] accumulo git commit: ACCUMULO-3795 add scanner batch timeout
Date Fri, 26 Jun 2015 16:19:32 GMT
ACCUMULO-3795 add scanner batch timeout


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/81994647
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/81994647
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/81994647

Branch: refs/heads/master
Commit: 819946477c7e9f6604332c3828021fe8db52cddb
Parents: d65f514
Author: Eric C. Newton <eric.newton@gmail.com>
Authored: Fri Jun 26 12:19:23 2015 -0400
Committer: Eric C. Newton <eric.newton@gmail.com>
Committed: Fri Jun 26 12:19:23 2015 -0400

----------------------------------------------------------------------
 .../core/client/ClientSideIteratorScanner.java  |   2 +
 .../accumulo/core/client/IsolatedScanner.java   |   1 +
 .../accumulo/core/client/ScannerBase.java       |  22 ++
 .../core/client/impl/ScannerOptions.java        |  20 ++
 .../impl/TabletServerBatchReaderIterator.java   |   2 +-
 .../core/client/impl/ThriftScanner.java         |  18 +-
 .../core/metadata/MetadataLocationObtainer.java |   4 +-
 .../thrift/TabletClientService.java             | 252 ++++++++++++++++---
 core/src/main/thrift/tabletserver.thrift        |   8 +-
 .../client/impl/TableOperationsImplTest.java    |   1 +
 .../server/util/VerifyTabletAssignments.java    |   2 +-
 .../monitor/servlets/trace/NullScanner.java     |  11 +
 .../apache/accumulo/tserver/TabletServer.java   |  15 +-
 .../accumulo/tserver/scan/LookupTask.java       |   2 +-
 .../tserver/session/MultiScanSession.java       |   4 +-
 .../accumulo/tserver/session/ScanSession.java   |   4 +-
 .../accumulo/tserver/tablet/ScanDataSource.java |   4 +-
 .../accumulo/tserver/tablet/ScanOptions.java    |   8 +-
 .../apache/accumulo/tserver/tablet/Scanner.java |   2 +-
 .../apache/accumulo/tserver/tablet/Tablet.java  |  36 ++-
 .../accumulo/test/ScanFlushWithTimeIT.java      | 107 ++++++++
 .../test/performance/thrift/NullTserver.java    |   4 +-
 22 files changed, 461 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/81994647/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java b/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java
index f077573..10931f5 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java
@@ -155,6 +155,7 @@ public class ClientSideIteratorScanner extends ScannerOptions implements Scanner
     this.range = scanner.getRange();
     this.size = scanner.getBatchSize();
     this.timeOut = scanner.getTimeout(TimeUnit.MILLISECONDS);
+    this.batchTimeOut = scanner.getTimeout(TimeUnit.MILLISECONDS);
     this.readaheadThreshold = scanner.getReadaheadThreshold();
   }
 
@@ -169,6 +170,7 @@ public class ClientSideIteratorScanner extends ScannerOptions implements Scanner
   public Iterator<Entry<Key,Value>> iterator() {
     smi.scanner.setBatchSize(size);
     smi.scanner.setTimeout(timeOut, TimeUnit.MILLISECONDS);
+    smi.scanner.setBatchTimeout(batchTimeOut, TimeUnit.MILLISECONDS);
     smi.scanner.setReadaheadThreshold(readaheadThreshold);
     if (isolated)
       smi.scanner.enableIsolation();

http://git-wip-us.apache.org/repos/asf/accumulo/blob/81994647/core/src/main/java/org/apache/accumulo/core/client/IsolatedScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/IsolatedScanner.java b/core/src/main/java/org/apache/accumulo/core/client/IsolatedScanner.java
index e530100..2e9f1d5 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/IsolatedScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/IsolatedScanner.java
@@ -226,6 +226,7 @@ public class IsolatedScanner extends ScannerOptions implements Scanner {
     this.scanner = scanner;
     this.range = scanner.getRange();
     this.timeOut = scanner.getTimeout(TimeUnit.MILLISECONDS);
+    this.batchTimeOut = scanner.getBatchTimeout(TimeUnit.MILLISECONDS);
     this.batchSize = scanner.getBatchSize();
     this.readaheadThreshold = scanner.getReadaheadThreshold();
     this.bufferFactory = bufferFactory;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/81994647/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java b/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java
index 92ab551..d33df03 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java
@@ -154,4 +154,26 @@ public interface ScannerBase extends Iterable<Entry<Key,Value>> {
    * @return The authorizations set on the scanner instance
    */
   Authorizations getAuthorizations();
+
+  /**
+   * This setting determines how long a scanner will wait to fill the returned batch. By default, a scanner wait until the batch is full.
+   *
+   * <p>
+   * Setting the timeout to zero (with any time unit) or {@link Long#MAX_VALUE} (with {@link TimeUnit#MILLISECONDS}) means no timeout.
+   *
+   * @param timeOut
+   *          the length of the timeout
+   * @param timeUnit
+   *          the units of the timeout
+   * @since 1.8.0
+   */
+  void setBatchTimeout(long timeout, TimeUnit milliseconds);
+
+  /**
+   * Returns the timeout to fill a batch in the given TimeUnit.
+   *
+   * @return the batch timeout configured for this scanner
+   * @since 1.8.0
+   */
+  long getBatchTimeout(TimeUnit timeUnit);
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/81994647/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java
index e455d5a..cc337dd 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java
@@ -49,6 +49,8 @@ public class ScannerOptions implements ScannerBase {
 
   protected long timeOut = Long.MAX_VALUE;
 
+  protected long batchTimeOut = Long.MAX_VALUE;
+
   private String regexIterName = null;
 
   protected ScannerOptions() {}
@@ -166,6 +168,7 @@ public class ScannerOptions implements ScannerBase {
         Set<Entry<String,Map<String,String>>> es = src.serverSideIteratorOptions.entrySet();
         for (Entry<String,Map<String,String>> entry : es)
           dst.serverSideIteratorOptions.put(entry.getKey(), new HashMap<String,String>(entry.getValue()));
+        dst.batchTimeOut = src.batchTimeOut;
       }
     }
   }
@@ -201,4 +204,21 @@ public class ScannerOptions implements ScannerBase {
   public Authorizations getAuthorizations() {
     throw new UnsupportedOperationException("No authorizations to return");
   }
+
+  @Override
+  public void setBatchTimeout(long timeout, TimeUnit timeUnit) {
+    if (timeOut < 0) {
+      throw new IllegalArgumentException("Batch timeout must be positive : " + timeOut);
+    }
+    if (timeout == 0) {
+      this.batchTimeOut = Long.MAX_VALUE;
+    } else {
+      this.batchTimeOut = timeUnit.toMillis(timeout);
+    }
+  }
+
+  @Override
+  public long getBatchTimeout(TimeUnit timeUnit) {
+    return timeUnit.convert(batchTimeOut, TimeUnit.MILLISECONDS);
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/81994647/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
index 053f2b3..f263581 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
@@ -635,7 +635,7 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value
             Translators.RT));
         InitialMultiScan imsr = client.startMultiScan(Tracer.traceInfo(), context.rpcCreds(), thriftTabletRanges,
             Translator.translate(columns, Translators.CT), options.serverSideIteratorList, options.serverSideIteratorOptions,
-            ByteBufferUtil.toByteBuffers(authorizations.getAuthorizations()), waitForWrites);
+            ByteBufferUtil.toByteBuffers(authorizations.getAuthorizations()), waitForWrites, options.batchTimeOut);
         if (waitForWrites)
           ThriftScanner.serversWaitedForWrites.get(ttype).add(server.toString());
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/81994647/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java
index 39d3b32..2116cf2 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java
@@ -80,7 +80,7 @@ public class ThriftScanner {
 
   public static boolean getBatchFromServer(ClientContext context, Range range, KeyExtent extent, String server, SortedMap<Key,Value> results,
       SortedSet<Column> fetchedColumns, List<IterInfo> serverSideIteratorList, Map<String,Map<String,String>> serverSideIteratorOptions, int size,
-      Authorizations authorizations, boolean retry) throws AccumuloException, AccumuloSecurityException, NotServingTabletException {
+      Authorizations authorizations, boolean retry, long batchTimeOut) throws AccumuloException, AccumuloSecurityException, NotServingTabletException {
     if (server == null)
       throw new AccumuloException(new IOException());
 
@@ -91,13 +91,13 @@ public class ThriftScanner {
       try {
         // not reading whole rows (or stopping on row boundries) so there is no need to enable isolation below
         ScanState scanState = new ScanState(context, extent.getTableId(), authorizations, range, fetchedColumns, size, serverSideIteratorList,
-            serverSideIteratorOptions, false);
+            serverSideIteratorOptions, false, batchTimeOut);
 
         TabletType ttype = TabletType.type(extent);
         boolean waitForWrites = !serversWaitedForWrites.get(ttype).contains(server);
         InitialScan isr = client.startScan(tinfo, scanState.context.rpcCreds(), extent.toThrift(), scanState.range.toThrift(),
             Translator.translate(scanState.columns, Translators.CT), scanState.size, scanState.serverSideIteratorList, scanState.serverSideIteratorOptions,
-            scanState.authorizations.getAuthorizationsBB(), waitForWrites, scanState.isolated, scanState.readaheadThreshold);
+            scanState.authorizations.getAuthorizationsBB(), waitForWrites, scanState.isolated, scanState.readaheadThreshold, scanState.batchTimeOut);
         if (waitForWrites)
           serversWaitedForWrites.get(ttype).add(server);
 
@@ -133,6 +133,7 @@ public class ThriftScanner {
     Text startRow;
     boolean skipStartRow;
     long readaheadThreshold;
+    long batchTimeOut;
 
     Range range;
 
@@ -152,13 +153,14 @@ public class ThriftScanner {
     Map<String,Map<String,String>> serverSideIteratorOptions;
 
     public ScanState(ClientContext context, Text tableId, Authorizations authorizations, Range range, SortedSet<Column> fetchedColumns, int size,
-        List<IterInfo> serverSideIteratorList, Map<String,Map<String,String>> serverSideIteratorOptions, boolean isolated) {
+        List<IterInfo> serverSideIteratorList, Map<String,Map<String,String>> serverSideIteratorOptions, boolean isolated, long batchTimeOut) {
       this(context, tableId, authorizations, range, fetchedColumns, size, serverSideIteratorList, serverSideIteratorOptions, isolated,
-          Constants.SCANNER_DEFAULT_READAHEAD_THRESHOLD);
+          Constants.SCANNER_DEFAULT_READAHEAD_THRESHOLD, batchTimeOut);
     }
 
     public ScanState(ClientContext context, Text tableId, Authorizations authorizations, Range range, SortedSet<Column> fetchedColumns, int size,
-        List<IterInfo> serverSideIteratorList, Map<String,Map<String,String>> serverSideIteratorOptions, boolean isolated, long readaheadThreshold) {
+        List<IterInfo> serverSideIteratorList, Map<String,Map<String,String>> serverSideIteratorOptions, boolean isolated, long readaheadThreshold,
+        long batchTimeOut) {
       this.context = context;
       ;
       this.authorizations = authorizations;
@@ -186,7 +188,7 @@ public class ThriftScanner {
 
       this.isolated = isolated;
       this.readaheadThreshold = readaheadThreshold;
-
+      this.batchTimeOut = batchTimeOut;
     }
   }
 
@@ -409,7 +411,7 @@ public class ThriftScanner {
         boolean waitForWrites = !serversWaitedForWrites.get(ttype).contains(loc.tablet_location);
         InitialScan is = client.startScan(tinfo, scanState.context.rpcCreds(), loc.tablet_extent.toThrift(), scanState.range.toThrift(),
             Translator.translate(scanState.columns, Translators.CT), scanState.size, scanState.serverSideIteratorList, scanState.serverSideIteratorOptions,
-            scanState.authorizations.getAuthorizationsBB(), waitForWrites, scanState.isolated, scanState.readaheadThreshold);
+            scanState.authorizations.getAuthorizationsBB(), waitForWrites, scanState.isolated, scanState.readaheadThreshold, scanState.batchTimeOut);
         if (waitForWrites)
           serversWaitedForWrites.get(ttype).add(loc.tablet_location);
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/81994647/core/src/main/java/org/apache/accumulo/core/metadata/MetadataLocationObtainer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/MetadataLocationObtainer.java b/core/src/main/java/org/apache/accumulo/core/metadata/MetadataLocationObtainer.java
index c8c61aa..0d294b8 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/MetadataLocationObtainer.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/MetadataLocationObtainer.java
@@ -93,7 +93,7 @@ public class MetadataLocationObtainer implements TabletLocationObtainer {
       serverSideIteratorList.add(new IterInfo(10000, WholeRowIterator.class.getName(), "WRI"));
       Map<String,Map<String,String>> serverSideIteratorOptions = Collections.emptyMap();
       boolean more = ThriftScanner.getBatchFromServer(context, range, src.tablet_extent, src.tablet_location, encodedResults, locCols, serverSideIteratorList,
-          serverSideIteratorOptions, Constants.SCAN_BATCH_SIZE, Authorizations.EMPTY, false);
+          serverSideIteratorOptions, Constants.SCAN_BATCH_SIZE, Authorizations.EMPTY, false, 0L);
 
       decodeRows(encodedResults, results);
 
@@ -101,7 +101,7 @@ public class MetadataLocationObtainer implements TabletLocationObtainer {
         range = new Range(results.lastKey().followingKey(PartialKey.ROW_COLFAM_COLQUAL_COLVIS_TIME), true, new Key(stopRow).followingKey(PartialKey.ROW), false);
         encodedResults.clear();
         more = ThriftScanner.getBatchFromServer(context, range, src.tablet_extent, src.tablet_location, encodedResults, locCols, serverSideIteratorList,
-            serverSideIteratorOptions, Constants.SCAN_BATCH_SIZE, Authorizations.EMPTY, false);
+            serverSideIteratorOptions, Constants.SCAN_BATCH_SIZE, Authorizations.EMPTY, false, 0L);
 
         decodeRows(encodedResults, results);
       }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/81994647/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/TabletClientService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/TabletClientService.java b/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/TabletClientService.java
index 02bd4e1..bd0f79c 100644
--- a/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/TabletClientService.java
+++ b/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/TabletClientService.java
@@ -52,13 +52,13 @@ import org.slf4j.LoggerFactory;
 
   public interface Iface extends org.apache.accumulo.core.client.impl.thrift.ClientService.Iface {
 
-    public org.apache.accumulo.core.data.thrift.InitialScan startScan(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.accumulo.core.data.thrift.TKeyExtent extent, org.apache.accumulo.core.data.thrift.TRange range, List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated, long readaheadThreshold) throws org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException, NotServingTabletException, TooManyFilesException, org.apache.thrift.TException;
+    public org.apache.accumulo.core.data.thrift.InitialScan startScan(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.accumulo.core.data.thrift.TKeyExtent extent, org.apache.accumulo.core.data.thrift.TRange range, List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated, long readaheadThreshold, long batchTimeOut) throws org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException, NotServingTabletException, TooManyFilesException, org.apache.thrift.TException;
 
     public org.apache.accumulo.core.data.thrift.ScanResult continueScan(org.apache.accumulo.core.trace.thrift.TInfo tinfo, long scanID) throws NoSuchScanIDException, NotServingTabletException, TooManyFilesException, org.apache.thrift.TException;
 
     public void closeScan(org.apache.accumulo.core.trace.thrift.TInfo tinfo, long scanID) throws org.apache.thrift.TException;
 
-    public org.apache.accumulo.core.data.thrift.InitialMultiScan startMultiScan(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, Map<org.apache.accumulo.core.data.thrift.TKeyExtent,List<org.apache.accumulo.core.data.thrift.TRange>> batch, List<org.apache.accumulo.core.data.thrift.TColumn> columns, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites) throws org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException, org.apache.thrift.TException;
+    public org.apache.accumulo.core.data.thrift.InitialMultiScan startMultiScan(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, Map<org.apache.accumulo.core.data.thrift.TKeyExtent,List<org.apache.accumulo.core.data.thrift.TRange>> batch, List<org.apache.accumulo.core.data.thrift.TColumn> columns, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, long batchTimeOut) throws org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException, org.apache.thrift.TException;
 
     public org.apache.accumulo.core.data.thrift.MultiScanResult continueMultiScan(org.apache.accumulo.core.trace.thrift.TInfo tinfo, long scanID) throws NoSuchScanIDException, org.apache.thrift.TException;
 
@@ -118,13 +118,13 @@ import org.slf4j.LoggerFactory;
 
   public interface AsyncIface extends org.apache.accumulo.core.client.impl.thrift.ClientService .AsyncIface {
 
-    public void startScan(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.accumulo.core.data.thrift.TKeyExtent extent, org.apache.accumulo.core.data.thrift.TRange range, List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated, long readaheadThreshold, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
+    public void startScan(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.accumulo.core.data.thrift.TKeyExtent extent, org.apache.accumulo.core.data.thrift.TRange range, List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated, long readaheadThreshold, long batchTimeOut, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
 
     public void continueScan(org.apache.accumulo.core.trace.thrift.TInfo tinfo, long scanID, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
 
     public void closeScan(org.apache.accumulo.core.trace.thrift.TInfo tinfo, long scanID, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
 
-    public void startMultiScan(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, Map<org.apache.accumulo.core.data.thrift.TKeyExtent,List<org.apache.accumulo.core.data.thrift.TRange>> batch, List<org.apache.accumulo.core.data.thrift.TColumn> columns, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
+    public void startMultiScan(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, Map<org.apache.accumulo.core.data.thrift.TKeyExtent,List<org.apache.accumulo.core.data.thrift.TRange>> batch, List<org.apache.accumulo.core.data.thrift.TColumn> columns, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, long batchTimeOut, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
 
     public void continueMultiScan(org.apache.accumulo.core.trace.thrift.TInfo tinfo, long scanID, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
 
@@ -202,13 +202,13 @@ import org.slf4j.LoggerFactory;
       super(iprot, oprot);
     }
 
-    public org.apache.accumulo.core.data.thrift.InitialScan startScan(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.accumulo.core.data.thrift.TKeyExtent extent, org.apache.accumulo.core.data.thrift.TRange range, List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated, long readaheadThreshold) throws org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException, NotServingTabletException, TooManyFilesException, org.apache.thrift.TException
+    public org.apache.accumulo.core.data.thrift.InitialScan startScan(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.accumulo.core.data.thrift.TKeyExtent extent, org.apache.accumulo.core.data.thrift.TRange range, List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated, long readaheadThreshold, long batchTimeOut) throws org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException, NotServingTabletException, TooManyFilesException, org.apache.thrift.TException
     {
-      send_startScan(tinfo, credentials, extent, range, columns, batchSize, ssiList, ssio, authorizations, waitForWrites, isolated, readaheadThreshold);
+      send_startScan(tinfo, credentials, extent, range, columns, batchSize, ssiList, ssio, authorizations, waitForWrites, isolated, readaheadThreshold, batchTimeOut);
       return recv_startScan();
     }
 
-    public void send_startScan(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.accumulo.core.data.thrift.TKeyExtent extent, org.apache.accumulo.core.data.thrift.TRange range, List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated, long readaheadThreshold) throws org.apache.thrift.TException
+    public void send_startScan(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.accumulo.core.data.thrift.TKeyExtent extent, org.apache.accumulo.core.data.thrift.TRange range, List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated, long readaheadThreshold, long batchTimeOut) throws org.apache.thrift.TException
     {
       startScan_args args = new startScan_args();
       args.setTinfo(tinfo);
@@ -223,6 +223,7 @@ import org.slf4j.LoggerFactory;
       args.setWaitForWrites(waitForWrites);
       args.setIsolated(isolated);
       args.setReadaheadThreshold(readaheadThreshold);
+      args.setBatchTimeOut(batchTimeOut);
       sendBase("startScan", args);
     }
 
@@ -291,13 +292,13 @@ import org.slf4j.LoggerFactory;
       sendBase("closeScan", args);
     }
 
-    public org.apache.accumulo.core.data.thrift.InitialMultiScan startMultiScan(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, Map<org.apache.accumulo.core.data.thrift.TKeyExtent,List<org.apache.accumulo.core.data.thrift.TRange>> batch, List<org.apache.accumulo.core.data.thrift.TColumn> columns, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites) throws org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException, org.apache.thrift.TException
+    public org.apache.accumulo.core.data.thrift.InitialMultiScan startMultiScan(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, Map<org.apache.accumulo.core.data.thrift.TKeyExtent,List<org.apache.accumulo.core.data.thrift.TRange>> batch, List<org.apache.accumulo.core.data.thrift.TColumn> columns, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, long batchTimeOut) throws org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException, org.apache.thrift.TException
     {
-      send_startMultiScan(tinfo, credentials, batch, columns, ssiList, ssio, authorizations, waitForWrites);
+      send_startMultiScan(tinfo, credentials, batch, columns, ssiList, ssio, authorizations, waitForWrites, batchTimeOut);
       return recv_startMultiScan();
     }
 
-    public void send_startMultiScan(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, Map<org.apache.accumulo.core.data.thrift.TKeyExtent,List<org.apache.accumulo.core.data.thrift.TRange>> batch, List<org.apache.accumulo.core.data.thrift.TColumn> columns, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites) throws org.apache.thrift.TException
+    public void send_startMultiScan(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, Map<org.apache.accumulo.core.data.thrift.TKeyExtent,List<org.apache.accumulo.core.data.thrift.TRange>> batch, List<org.apache.accumulo.core.data.thrift.TColumn> columns, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, long batchTimeOut) throws org.apache.thrift.TException
     {
       startMultiScan_args args = new startMultiScan_args();
       args.setTinfo(tinfo);
@@ -308,6 +309,7 @@ import org.slf4j.LoggerFactory;
       args.setSsio(ssio);
       args.setAuthorizations(authorizations);
       args.setWaitForWrites(waitForWrites);
+      args.setBatchTimeOut(batchTimeOut);
       sendBase("startMultiScan", args);
     }
 
@@ -956,9 +958,9 @@ import org.slf4j.LoggerFactory;
       super(protocolFactory, clientManager, transport);
     }
 
-    public void startScan(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.accumulo.core.data.thrift.TKeyExtent extent, org.apache.accumulo.core.data.thrift.TRange range, List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated, long readaheadThreshold, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException {
+    public void startScan(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.accumulo.core.data.thrift.TKeyExtent extent, org.apache.accumulo.core.data.thrift.TRange range, List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated, long readaheadThreshold, long batchTimeOut, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException {
       checkReady();
-      startScan_call method_call = new startScan_call(tinfo, credentials, extent, range, columns, batchSize, ssiList, ssio, authorizations, waitForWrites, isolated, readaheadThreshold, resultHandler, this, ___protocolFactory, ___transport);
+      startScan_call method_call = new startScan_call(tinfo, credentials, extent, range, columns, batchSize, ssiList, ssio, authorizations, waitForWrites, isolated, readaheadThreshold, batchTimeOut, resultHandler, this, ___protocolFactory, ___transport);
       this.___currentMethod = method_call;
       ___manager.call(method_call);
     }
@@ -976,7 +978,8 @@ import org.slf4j.LoggerFactory;
       private boolean waitForWrites;
       private boolean isolated;
       private long readaheadThreshold;
-      public startScan_call(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.accumulo.core.data.thrift.TKeyExtent extent, org.apache.accumulo.core.data.thrift.TRange range, List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated, long readaheadThreshold, org.apache.thrift.async.AsyncMethodCallback resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
+      private long batchTimeOut;
+      public startScan_call(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.accumulo.core.data.thrift.TKeyExtent extent, org.apache.accumulo.core.data.thrift.TRange range, List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated, long readaheadThreshold, long batchTimeOut, org.apache.thrift.async.AsyncMethodCallback resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
         super(client, protocolFactory, transport, resultHandler, false);
         this.tinfo = tinfo;
         this.credentials = credentials;
@@ -990,6 +993,7 @@ import org.slf4j.LoggerFactory;
         this.waitForWrites = waitForWrites;
         this.isolated = isolated;
         this.readaheadThreshold = readaheadThreshold;
+        this.batchTimeOut = batchTimeOut;
       }
 
       public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException {
@@ -1007,6 +1011,7 @@ import org.slf4j.LoggerFactory;
         args.setWaitForWrites(waitForWrites);
         args.setIsolated(isolated);
         args.setReadaheadThreshold(readaheadThreshold);
+        args.setBatchTimeOut(batchTimeOut);
         args.write(prot);
         prot.writeMessageEnd();
       }
@@ -1090,9 +1095,9 @@ import org.slf4j.LoggerFactory;
       }
     }
 
-    public void startMultiScan(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, Map<org.apache.accumulo.core.data.thrift.TKeyExtent,List<org.apache.accumulo.core.data.thrift.TRange>> batch, List<org.apache.accumulo.core.data.thrift.TColumn> columns, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException {
+    public void startMultiScan(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, Map<org.apache.accumulo.core.data.thrift.TKeyExtent,List<org.apache.accumulo.core.data.thrift.TRange>> batch, List<org.apache.accumulo.core.data.thrift.TColumn> columns, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, long batchTimeOut, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException {
       checkReady();
-      startMultiScan_call method_call = new startMultiScan_call(tinfo, credentials, batch, columns, ssiList, ssio, authorizations, waitForWrites, resultHandler, this, ___protocolFactory, ___transport);
+      startMultiScan_call method_call = new startMultiScan_call(tinfo, credentials, batch, columns, ssiList, ssio, authorizations, waitForWrites, batchTimeOut, resultHandler, this, ___protocolFactory, ___transport);
       this.___currentMethod = method_call;
       ___manager.call(method_call);
     }
@@ -1106,7 +1111,8 @@ import org.slf4j.LoggerFactory;
       private Map<String,Map<String,String>> ssio;
       private List<ByteBuffer> authorizations;
       private boolean waitForWrites;
-      public startMultiScan_call(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, Map<org.apache.accumulo.core.data.thrift.TKeyExtent,List<org.apache.accumulo.core.data.thrift.TRange>> batch, List<org.apache.accumulo.core.data.thrift.TColumn> columns, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, org.apache.thrift.async.AsyncMethodCallback resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
+      private long batchTimeOut;
+      public startMultiScan_call(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, Map<org.apache.accumulo.core.data.thrift.TKeyExtent,List<org.apache.accumulo.core.data.thrift.TRange>> batch, List<org.apache.accumulo.core.data.thrift.TColumn> columns, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, long batchTimeOut, org.apache.thrift.async.AsyncMethodCallback resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
         super(client, protocolFactory, transport, resultHandler, false);
         this.tinfo = tinfo;
         this.credentials = credentials;
@@ -1116,6 +1122,7 @@ import org.slf4j.LoggerFactory;
         this.ssio = ssio;
         this.authorizations = authorizations;
         this.waitForWrites = waitForWrites;
+        this.batchTimeOut = batchTimeOut;
       }
 
       public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException {
@@ -1129,6 +1136,7 @@ import org.slf4j.LoggerFactory;
         args.setSsio(ssio);
         args.setAuthorizations(authorizations);
         args.setWaitForWrites(waitForWrites);
+        args.setBatchTimeOut(batchTimeOut);
         args.write(prot);
         prot.writeMessageEnd();
       }
@@ -2252,7 +2260,7 @@ import org.slf4j.LoggerFactory;
       public startScan_result getResult(I iface, startScan_args args) throws org.apache.thrift.TException {
         startScan_result result = new startScan_result();
         try {
-          result.success = iface.startScan(args.tinfo, args.credentials, args.extent, args.range, args.columns, args.batchSize, args.ssiList, args.ssio, args.authorizations, args.waitForWrites, args.isolated, args.readaheadThreshold);
+          result.success = iface.startScan(args.tinfo, args.credentials, args.extent, args.range, args.columns, args.batchSize, args.ssiList, args.ssio, args.authorizations, args.waitForWrites, args.isolated, args.readaheadThreshold, args.batchTimeOut);
         } catch (org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException sec) {
           result.sec = sec;
         } catch (NotServingTabletException nste) {
@@ -2327,7 +2335,7 @@ import org.slf4j.LoggerFactory;
       public startMultiScan_result getResult(I iface, startMultiScan_args args) throws org.apache.thrift.TException {
         startMultiScan_result result = new startMultiScan_result();
         try {
-          result.success = iface.startMultiScan(args.tinfo, args.credentials, args.batch, args.columns, args.ssiList, args.ssio, args.authorizations, args.waitForWrites);
+          result.success = iface.startMultiScan(args.tinfo, args.credentials, args.batch, args.columns, args.ssiList, args.ssio, args.authorizations, args.waitForWrites, args.batchTimeOut);
         } catch (org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException sec) {
           result.sec = sec;
         }
@@ -3042,7 +3050,7 @@ import org.slf4j.LoggerFactory;
       }
 
       public void start(I iface, startScan_args args, org.apache.thrift.async.AsyncMethodCallback<org.apache.accumulo.core.data.thrift.InitialScan> resultHandler) throws TException {
-        iface.startScan(args.tinfo, args.credentials, args.extent, args.range, args.columns, args.batchSize, args.ssiList, args.ssio, args.authorizations, args.waitForWrites, args.isolated, args.readaheadThreshold,resultHandler);
+        iface.startScan(args.tinfo, args.credentials, args.extent, args.range, args.columns, args.batchSize, args.ssiList, args.ssio, args.authorizations, args.waitForWrites, args.isolated, args.readaheadThreshold, args.batchTimeOut,resultHandler);
       }
     }
 
@@ -3194,7 +3202,7 @@ import org.slf4j.LoggerFactory;
       }
 
       public void start(I iface, startMultiScan_args args, org.apache.thrift.async.AsyncMethodCallback<org.apache.accumulo.core.data.thrift.InitialMultiScan> resultHandler) throws TException {
-        iface.startMultiScan(args.tinfo, args.credentials, args.batch, args.columns, args.ssiList, args.ssio, args.authorizations, args.waitForWrites,resultHandler);
+        iface.startMultiScan(args.tinfo, args.credentials, args.batch, args.columns, args.ssiList, args.ssio, args.authorizations, args.waitForWrites, args.batchTimeOut,resultHandler);
       }
     }
 
@@ -4463,6 +4471,7 @@ import org.slf4j.LoggerFactory;
     private static final org.apache.thrift.protocol.TField WAIT_FOR_WRITES_FIELD_DESC = new org.apache.thrift.protocol.TField("waitForWrites", org.apache.thrift.protocol.TType.BOOL, (short)9);
     private static final org.apache.thrift.protocol.TField ISOLATED_FIELD_DESC = new org.apache.thrift.protocol.TField("isolated", org.apache.thrift.protocol.TType.BOOL, (short)10);
     private static final org.apache.thrift.protocol.TField READAHEAD_THRESHOLD_FIELD_DESC = new org.apache.thrift.protocol.TField("readaheadThreshold", org.apache.thrift.protocol.TType.I64, (short)12);
+    private static final org.apache.thrift.protocol.TField BATCH_TIME_OUT_FIELD_DESC = new org.apache.thrift.protocol.TField("batchTimeOut", org.apache.thrift.protocol.TType.I64, (short)13);
 
     private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
     static {
@@ -4482,6 +4491,7 @@ import org.slf4j.LoggerFactory;
     public boolean waitForWrites; // required
     public boolean isolated; // required
     public long readaheadThreshold; // required
+    public long batchTimeOut; // required
 
     /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
     public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -4496,7 +4506,8 @@ import org.slf4j.LoggerFactory;
       AUTHORIZATIONS((short)8, "authorizations"),
       WAIT_FOR_WRITES((short)9, "waitForWrites"),
       ISOLATED((short)10, "isolated"),
-      READAHEAD_THRESHOLD((short)12, "readaheadThreshold");
+      READAHEAD_THRESHOLD((short)12, "readaheadThreshold"),
+      BATCH_TIME_OUT((short)13, "batchTimeOut");
 
       private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
 
@@ -4535,6 +4546,8 @@ import org.slf4j.LoggerFactory;
             return ISOLATED;
           case 12: // READAHEAD_THRESHOLD
             return READAHEAD_THRESHOLD;
+          case 13: // BATCH_TIME_OUT
+            return BATCH_TIME_OUT;
           default:
             return null;
         }
@@ -4579,6 +4592,7 @@ import org.slf4j.LoggerFactory;
     private static final int __WAITFORWRITES_ISSET_ID = 1;
     private static final int __ISOLATED_ISSET_ID = 2;
     private static final int __READAHEADTHRESHOLD_ISSET_ID = 3;
+    private static final int __BATCHTIMEOUT_ISSET_ID = 4;
     private byte __isset_bitfield = 0;
     public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
     static {
@@ -4614,6 +4628,8 @@ import org.slf4j.LoggerFactory;
           new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL)));
       tmpMap.put(_Fields.READAHEAD_THRESHOLD, new org.apache.thrift.meta_data.FieldMetaData("readaheadThreshold", org.apache.thrift.TFieldRequirementType.DEFAULT, 
           new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
+      tmpMap.put(_Fields.BATCH_TIME_OUT, new org.apache.thrift.meta_data.FieldMetaData("batchTimeOut", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+          new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
       metaDataMap = Collections.unmodifiableMap(tmpMap);
       org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(startScan_args.class, metaDataMap);
     }
@@ -4633,7 +4649,8 @@ import org.slf4j.LoggerFactory;
       List<ByteBuffer> authorizations,
       boolean waitForWrites,
       boolean isolated,
-      long readaheadThreshold)
+      long readaheadThreshold,
+      long batchTimeOut)
     {
       this();
       this.tinfo = tinfo;
@@ -4652,6 +4669,8 @@ import org.slf4j.LoggerFactory;
       setIsolatedIsSet(true);
       this.readaheadThreshold = readaheadThreshold;
       setReadaheadThresholdIsSet(true);
+      this.batchTimeOut = batchTimeOut;
+      setBatchTimeOutIsSet(true);
     }
 
     /**
@@ -4708,6 +4727,7 @@ import org.slf4j.LoggerFactory;
       this.waitForWrites = other.waitForWrites;
       this.isolated = other.isolated;
       this.readaheadThreshold = other.readaheadThreshold;
+      this.batchTimeOut = other.batchTimeOut;
     }
 
     public startScan_args deepCopy() {
@@ -4732,6 +4752,8 @@ import org.slf4j.LoggerFactory;
       this.isolated = false;
       setReadaheadThresholdIsSet(false);
       this.readaheadThreshold = 0;
+      setBatchTimeOutIsSet(false);
+      this.batchTimeOut = 0;
     }
 
     public org.apache.accumulo.core.trace.thrift.TInfo getTinfo() {
@@ -5074,6 +5096,29 @@ import org.slf4j.LoggerFactory;
       __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __READAHEADTHRESHOLD_ISSET_ID, value);
     }
 
+    public long getBatchTimeOut() {
+      return this.batchTimeOut;
+    }
+
+    public startScan_args setBatchTimeOut(long batchTimeOut) {
+      this.batchTimeOut = batchTimeOut;
+      setBatchTimeOutIsSet(true);
+      return this;
+    }
+
+    public void unsetBatchTimeOut() {
+      __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __BATCHTIMEOUT_ISSET_ID);
+    }
+
+    /** Returns true if field batchTimeOut is set (has been assigned a value) and false otherwise */
+    public boolean isSetBatchTimeOut() {
+      return EncodingUtils.testBit(__isset_bitfield, __BATCHTIMEOUT_ISSET_ID);
+    }
+
+    public void setBatchTimeOutIsSet(boolean value) {
+      __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __BATCHTIMEOUT_ISSET_ID, value);
+    }
+
     public void setFieldValue(_Fields field, Object value) {
       switch (field) {
       case TINFO:
@@ -5172,6 +5217,14 @@ import org.slf4j.LoggerFactory;
         }
         break;
 
+      case BATCH_TIME_OUT:
+        if (value == null) {
+          unsetBatchTimeOut();
+        } else {
+          setBatchTimeOut((Long)value);
+        }
+        break;
+
       }
     }
 
@@ -5213,6 +5266,9 @@ import org.slf4j.LoggerFactory;
       case READAHEAD_THRESHOLD:
         return Long.valueOf(getReadaheadThreshold());
 
+      case BATCH_TIME_OUT:
+        return Long.valueOf(getBatchTimeOut());
+
       }
       throw new IllegalStateException();
     }
@@ -5248,6 +5304,8 @@ import org.slf4j.LoggerFactory;
         return isSetIsolated();
       case READAHEAD_THRESHOLD:
         return isSetReadaheadThreshold();
+      case BATCH_TIME_OUT:
+        return isSetBatchTimeOut();
       }
       throw new IllegalStateException();
     }
@@ -5373,6 +5431,15 @@ import org.slf4j.LoggerFactory;
           return false;
       }
 
+      boolean this_present_batchTimeOut = true;
+      boolean that_present_batchTimeOut = true;
+      if (this_present_batchTimeOut || that_present_batchTimeOut) {
+        if (!(this_present_batchTimeOut && that_present_batchTimeOut))
+          return false;
+        if (this.batchTimeOut != that.batchTimeOut)
+          return false;
+      }
+
       return true;
     }
 
@@ -5509,6 +5576,16 @@ import org.slf4j.LoggerFactory;
           return lastComparison;
         }
       }
+      lastComparison = Boolean.valueOf(isSetBatchTimeOut()).compareTo(other.isSetBatchTimeOut());
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+      if (isSetBatchTimeOut()) {
+        lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.batchTimeOut, other.batchTimeOut);
+        if (lastComparison != 0) {
+          return lastComparison;
+        }
+      }
       return 0;
     }
 
@@ -5608,6 +5685,10 @@ import org.slf4j.LoggerFactory;
       sb.append("readaheadThreshold:");
       sb.append(this.readaheadThreshold);
       first = false;
+      if (!first) sb.append(", ");
+      sb.append("batchTimeOut:");
+      sb.append(this.batchTimeOut);
+      first = false;
       sb.append(")");
       return sb.toString();
     }
@@ -5821,6 +5902,14 @@ import org.slf4j.LoggerFactory;
                 org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
               }
               break;
+            case 13: // BATCH_TIME_OUT
+              if (schemeField.type == org.apache.thrift.protocol.TType.I64) {
+                struct.batchTimeOut = iprot.readI64();
+                struct.setBatchTimeOutIsSet(true);
+              } else { 
+                org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+              }
+              break;
             default:
               org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
           }
@@ -5925,6 +6014,9 @@ import org.slf4j.LoggerFactory;
         oprot.writeFieldBegin(READAHEAD_THRESHOLD_FIELD_DESC);
         oprot.writeI64(struct.readaheadThreshold);
         oprot.writeFieldEnd();
+        oprot.writeFieldBegin(BATCH_TIME_OUT_FIELD_DESC);
+        oprot.writeI64(struct.batchTimeOut);
+        oprot.writeFieldEnd();
         oprot.writeFieldStop();
         oprot.writeStructEnd();
       }
@@ -5979,7 +6071,10 @@ import org.slf4j.LoggerFactory;
         if (struct.isSetReadaheadThreshold()) {
           optionals.set(11);
         }
-        oprot.writeBitSet(optionals, 12);
+        if (struct.isSetBatchTimeOut()) {
+          optionals.set(12);
+        }
+        oprot.writeBitSet(optionals, 13);
         if (struct.isSetTinfo()) {
           struct.tinfo.write(oprot);
         }
@@ -6048,12 +6143,15 @@ import org.slf4j.LoggerFactory;
         if (struct.isSetReadaheadThreshold()) {
           oprot.writeI64(struct.readaheadThreshold);
         }
+        if (struct.isSetBatchTimeOut()) {
+          oprot.writeI64(struct.batchTimeOut);
+        }
       }
 
       @Override
       public void read(org.apache.thrift.protocol.TProtocol prot, startScan_args struct) throws org.apache.thrift.TException {
         TTupleProtocol iprot = (TTupleProtocol) prot;
-        BitSet incoming = iprot.readBitSet(12);
+        BitSet incoming = iprot.readBitSet(13);
         if (incoming.get(0)) {
           struct.tinfo = new org.apache.accumulo.core.trace.thrift.TInfo();
           struct.tinfo.read(iprot);
@@ -6157,6 +6255,10 @@ import org.slf4j.LoggerFactory;
           struct.readaheadThreshold = iprot.readI64();
           struct.setReadaheadThresholdIsSet(true);
         }
+        if (incoming.get(12)) {
+          struct.batchTimeOut = iprot.readI64();
+          struct.setBatchTimeOutIsSet(true);
+        }
       }
     }
 
@@ -8417,6 +8519,7 @@ import org.slf4j.LoggerFactory;
     private static final org.apache.thrift.protocol.TField SSIO_FIELD_DESC = new org.apache.thrift.protocol.TField("ssio", org.apache.thrift.protocol.TType.MAP, (short)5);
     private static final org.apache.thrift.protocol.TField AUTHORIZATIONS_FIELD_DESC = new org.apache.thrift.protocol.TField("authorizations", org.apache.thrift.protocol.TType.LIST, (short)6);
     private static final org.apache.thrift.protocol.TField WAIT_FOR_WRITES_FIELD_DESC = new org.apache.thrift.protocol.TField("waitForWrites", org.apache.thrift.protocol.TType.BOOL, (short)7);
+    private static final org.apache.thrift.protocol.TField BATCH_TIME_OUT_FIELD_DESC = new org.apache.thrift.protocol.TField("batchTimeOut", org.apache.thrift.protocol.TType.I64, (short)9);
 
     private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
     static {
@@ -8432,6 +8535,7 @@ import org.slf4j.LoggerFactory;
     public Map<String,Map<String,String>> ssio; // required
     public List<ByteBuffer> authorizations; // required
     public boolean waitForWrites; // required
+    public long batchTimeOut; // required
 
     /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
     public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -8442,7 +8546,8 @@ import org.slf4j.LoggerFactory;
       SSI_LIST((short)4, "ssiList"),
       SSIO((short)5, "ssio"),
       AUTHORIZATIONS((short)6, "authorizations"),
-      WAIT_FOR_WRITES((short)7, "waitForWrites");
+      WAIT_FOR_WRITES((short)7, "waitForWrites"),
+      BATCH_TIME_OUT((short)9, "batchTimeOut");
 
       private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
 
@@ -8473,6 +8578,8 @@ import org.slf4j.LoggerFactory;
             return AUTHORIZATIONS;
           case 7: // WAIT_FOR_WRITES
             return WAIT_FOR_WRITES;
+          case 9: // BATCH_TIME_OUT
+            return BATCH_TIME_OUT;
           default:
             return null;
         }
@@ -8514,6 +8621,7 @@ import org.slf4j.LoggerFactory;
 
     // isset id assignments
     private static final int __WAITFORWRITES_ISSET_ID = 0;
+    private static final int __BATCHTIMEOUT_ISSET_ID = 1;
     private byte __isset_bitfield = 0;
     public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
     static {
@@ -8541,6 +8649,8 @@ import org.slf4j.LoggerFactory;
               new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING              , true))));
       tmpMap.put(_Fields.WAIT_FOR_WRITES, new org.apache.thrift.meta_data.FieldMetaData("waitForWrites", org.apache.thrift.TFieldRequirementType.DEFAULT, 
           new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL)));
+      tmpMap.put(_Fields.BATCH_TIME_OUT, new org.apache.thrift.meta_data.FieldMetaData("batchTimeOut", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+          new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
       metaDataMap = Collections.unmodifiableMap(tmpMap);
       org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(startMultiScan_args.class, metaDataMap);
     }
@@ -8556,7 +8666,8 @@ import org.slf4j.LoggerFactory;
       List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList,
       Map<String,Map<String,String>> ssio,
       List<ByteBuffer> authorizations,
-      boolean waitForWrites)
+      boolean waitForWrites,
+      long batchTimeOut)
     {
       this();
       this.tinfo = tinfo;
@@ -8568,6 +8679,8 @@ import org.slf4j.LoggerFactory;
       this.authorizations = authorizations;
       this.waitForWrites = waitForWrites;
       setWaitForWritesIsSet(true);
+      this.batchTimeOut = batchTimeOut;
+      setBatchTimeOutIsSet(true);
     }
 
     /**
@@ -8618,6 +8731,7 @@ import org.slf4j.LoggerFactory;
         this.authorizations = __this__authorizations;
       }
       this.waitForWrites = other.waitForWrites;
+      this.batchTimeOut = other.batchTimeOut;
     }
 
     public startMultiScan_args deepCopy() {
@@ -8635,6 +8749,8 @@ import org.slf4j.LoggerFactory;
       this.authorizations = null;
       setWaitForWritesIsSet(false);
       this.waitForWrites = false;
+      setBatchTimeOutIsSet(false);
+      this.batchTimeOut = 0;
     }
 
     public org.apache.accumulo.core.trace.thrift.TInfo getTinfo() {
@@ -8895,6 +9011,29 @@ import org.slf4j.LoggerFactory;
       __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __WAITFORWRITES_ISSET_ID, value);
     }
 
+    public long getBatchTimeOut() {
+      return this.batchTimeOut;
+    }
+
+    public startMultiScan_args setBatchTimeOut(long batchTimeOut) {
+      this.batchTimeOut = batchTimeOut;
+      setBatchTimeOutIsSet(true);
+      return this;
+    }
+
+    public void unsetBatchTimeOut() {
+      __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __BATCHTIMEOUT_ISSET_ID);
+    }
+
+    /** Returns true if field batchTimeOut is set (has been assigned a value) and false otherwise */
+    public boolean isSetBatchTimeOut() {
+      return EncodingUtils.testBit(__isset_bitfield, __BATCHTIMEOUT_ISSET_ID);
+    }
+
+    public void setBatchTimeOutIsSet(boolean value) {
+      __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __BATCHTIMEOUT_ISSET_ID, value);
+    }
+
     public void setFieldValue(_Fields field, Object value) {
       switch (field) {
       case TINFO:
@@ -8961,6 +9100,14 @@ import org.slf4j.LoggerFactory;
         }
         break;
 
+      case BATCH_TIME_OUT:
+        if (value == null) {
+          unsetBatchTimeOut();
+        } else {
+          setBatchTimeOut((Long)value);
+        }
+        break;
+
       }
     }
 
@@ -8990,6 +9137,9 @@ import org.slf4j.LoggerFactory;
       case WAIT_FOR_WRITES:
         return Boolean.valueOf(isWaitForWrites());
 
+      case BATCH_TIME_OUT:
+        return Long.valueOf(getBatchTimeOut());
+
       }
       throw new IllegalStateException();
     }
@@ -9017,6 +9167,8 @@ import org.slf4j.LoggerFactory;
         return isSetAuthorizations();
       case WAIT_FOR_WRITES:
         return isSetWaitForWrites();
+      case BATCH_TIME_OUT:
+        return isSetBatchTimeOut();
       }
       throw new IllegalStateException();
     }
@@ -9106,6 +9258,15 @@ import org.slf4j.LoggerFactory;
           return false;
       }
 
+      boolean this_present_batchTimeOut = true;
+      boolean that_present_batchTimeOut = true;
+      if (this_present_batchTimeOut || that_present_batchTimeOut) {
+        if (!(this_present_batchTimeOut && that_present_batchTimeOut))
+          return false;
+        if (this.batchTimeOut != that.batchTimeOut)
+          return false;
+      }
+
       return true;
     }
 
@@ -9202,6 +9363,16 @@ import org.slf4j.LoggerFactory;
           return lastComparison;
         }
       }
+      lastComparison = Boolean.valueOf(isSetBatchTimeOut()).compareTo(other.isSetBatchTimeOut());
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+      if (isSetBatchTimeOut()) {
+        lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.batchTimeOut, other.batchTimeOut);
+        if (lastComparison != 0) {
+          return lastComparison;
+        }
+      }
       return 0;
     }
 
@@ -9281,6 +9452,10 @@ import org.slf4j.LoggerFactory;
       sb.append("waitForWrites:");
       sb.append(this.waitForWrites);
       first = false;
+      if (!first) sb.append(", ");
+      sb.append("batchTimeOut:");
+      sb.append(this.batchTimeOut);
+      first = false;
       sb.append(")");
       return sb.toString();
     }
@@ -9478,6 +9653,14 @@ import org.slf4j.LoggerFactory;
                 org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
               }
               break;
+            case 9: // BATCH_TIME_OUT
+              if (schemeField.type == org.apache.thrift.protocol.TType.I64) {
+                struct.batchTimeOut = iprot.readI64();
+                struct.setBatchTimeOutIsSet(true);
+              } else { 
+                org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+              }
+              break;
             default:
               org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
           }
@@ -9583,6 +9766,9 @@ import org.slf4j.LoggerFactory;
           struct.tinfo.write(oprot);
           oprot.writeFieldEnd();
         }
+        oprot.writeFieldBegin(BATCH_TIME_OUT_FIELD_DESC);
+        oprot.writeI64(struct.batchTimeOut);
+        oprot.writeFieldEnd();
         oprot.writeFieldStop();
         oprot.writeStructEnd();
       }
@@ -9625,7 +9811,10 @@ import org.slf4j.LoggerFactory;
         if (struct.isSetWaitForWrites()) {
           optionals.set(7);
         }
-        oprot.writeBitSet(optionals, 8);
+        if (struct.isSetBatchTimeOut()) {
+          optionals.set(8);
+        }
+        oprot.writeBitSet(optionals, 9);
         if (struct.isSetTinfo()) {
           struct.tinfo.write(oprot);
         }
@@ -9695,12 +9884,15 @@ import org.slf4j.LoggerFactory;
         if (struct.isSetWaitForWrites()) {
           oprot.writeBool(struct.waitForWrites);
         }
+        if (struct.isSetBatchTimeOut()) {
+          oprot.writeI64(struct.batchTimeOut);
+        }
       }
 
       @Override
       public void read(org.apache.thrift.protocol.TProtocol prot, startMultiScan_args struct) throws org.apache.thrift.TException {
         TTupleProtocol iprot = (TTupleProtocol) prot;
-        BitSet incoming = iprot.readBitSet(8);
+        BitSet incoming = iprot.readBitSet(9);
         if (incoming.get(0)) {
           struct.tinfo = new org.apache.accumulo.core.trace.thrift.TInfo();
           struct.tinfo.read(iprot);
@@ -9808,6 +10000,10 @@ import org.slf4j.LoggerFactory;
           struct.waitForWrites = iprot.readBool();
           struct.setWaitForWritesIsSet(true);
         }
+        if (incoming.get(8)) {
+          struct.batchTimeOut = iprot.readI64();
+          struct.setBatchTimeOutIsSet(true);
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/81994647/core/src/main/thrift/tabletserver.thrift
----------------------------------------------------------------------
diff --git a/core/src/main/thrift/tabletserver.thrift b/core/src/main/thrift/tabletserver.thrift
index 4a31036..051daee 100644
--- a/core/src/main/thrift/tabletserver.thrift
+++ b/core/src/main/thrift/tabletserver.thrift
@@ -149,7 +149,8 @@ service TabletClientService extends client.ClientService {
                              8:list<binary> authorizations
                              9:bool waitForWrites,
                              10:bool isolated,
-                             12:i64 readaheadThreshold)  throws (1:client.ThriftSecurityException sec, 2:NotServingTabletException nste, 3:TooManyFilesException tmfe),
+                             12:i64 readaheadThreshold,
+                             13:i64 batchTimeOut)  throws (1:client.ThriftSecurityException sec, 2:NotServingTabletException nste, 3:TooManyFilesException tmfe),
                              
   data.ScanResult continueScan(2:trace.TInfo tinfo, 1:data.ScanID scanID)  throws (1:NoSuchScanIDException nssi, 2:NotServingTabletException nste, 3:TooManyFilesException tmfe),
   oneway void closeScan(2:trace.TInfo tinfo, 1:data.ScanID scanID),
@@ -161,8 +162,9 @@ service TabletClientService extends client.ClientService {
                                   3:list<data.TColumn> columns,
                                   4:list<data.IterInfo> ssiList,
                                   5:map<string, map<string, string>> ssio,
-                                  6:list<binary> authorizations
-                                  7:bool waitForWrites)  throws (1:client.ThriftSecurityException sec),
+                                  6:list<binary> authorizations,
+                                  7:bool waitForWrites,
+                                  9:i64 batchTimeOut)  throws (1:client.ThriftSecurityException sec),
   data.MultiScanResult continueMultiScan(2:trace.TInfo tinfo, 1:data.ScanID scanID) throws (1:NoSuchScanIDException nssi),
   void closeMultiScan(2:trace.TInfo tinfo, 1:data.ScanID scanID) throws (1:NoSuchScanIDException nssi),
   

http://git-wip-us.apache.org/repos/asf/accumulo/blob/81994647/core/src/test/java/org/apache/accumulo/core/client/impl/TableOperationsImplTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/client/impl/TableOperationsImplTest.java b/core/src/test/java/org/apache/accumulo/core/client/impl/TableOperationsImplTest.java
index 523d157..7351ede 100644
--- a/core/src/test/java/org/apache/accumulo/core/client/impl/TableOperationsImplTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/client/impl/TableOperationsImplTest.java
@@ -74,6 +74,7 @@ public class TableOperationsImplTest {
     // IsolatedScanner -- make the verification pass, not really relevant
     EasyMock.expect(scanner.getRange()).andReturn(range).anyTimes();
     EasyMock.expect(scanner.getTimeout(TimeUnit.MILLISECONDS)).andReturn(Long.MAX_VALUE);
+    EasyMock.expect(scanner.getBatchTimeout(TimeUnit.MILLISECONDS)).andReturn(Long.MAX_VALUE);
     EasyMock.expect(scanner.getBatchSize()).andReturn(1000);
     EasyMock.expect(scanner.getReadaheadThreshold()).andReturn(100l);
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/81994647/server/base/src/main/java/org/apache/accumulo/server/util/VerifyTabletAssignments.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/VerifyTabletAssignments.java b/server/base/src/main/java/org/apache/accumulo/server/util/VerifyTabletAssignments.java
index c0c979b..0d7ade8 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/VerifyTabletAssignments.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/VerifyTabletAssignments.java
@@ -189,7 +189,7 @@ public class VerifyTabletAssignments {
     List<IterInfo> emptyListIterInfo = Collections.emptyList();
     List<TColumn> emptyListColumn = Collections.emptyList();
     InitialMultiScan is = client.startMultiScan(tinfo, context.rpcCreds(), batch, emptyListColumn, emptyListIterInfo, emptyMapSMapSS,
-        Authorizations.EMPTY.getAuthorizationsBB(), false);
+        Authorizations.EMPTY.getAuthorizationsBB(), false, 0L);
     if (is.result.more) {
       MultiScanResult result = client.continueMultiScan(tinfo, is.scanID);
       checkFailures(entry.getKey(), failures, result);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/81994647/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/trace/NullScanner.java
----------------------------------------------------------------------
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/trace/NullScanner.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/trace/NullScanner.java
index 0ba13c7..750ad8e 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/trace/NullScanner.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/trace/NullScanner.java
@@ -123,4 +123,15 @@ public class NullScanner implements Scanner {
   public void setReadaheadThreshold(long batches) {
 
   }
+
+  @Override
+  public void setBatchTimeout(long timeout, TimeUnit milliseconds) {
+
+  }
+
+  @Override
+  public long getBatchTimeout(TimeUnit timeUnit) {
+    return 0;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/81994647/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 4d0b9f6..dc382f2 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -442,7 +442,8 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
     @Override
     public InitialScan startScan(TInfo tinfo, TCredentials credentials, TKeyExtent textent, TRange range, List<TColumn> columns, int batchSize,
         List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated,
-        long readaheadThreshold) throws NotServingTabletException, ThriftSecurityException, org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException {
+        long readaheadThreshold, long batchTimeOut) throws NotServingTabletException, ThriftSecurityException,
+        org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException {
 
       String tableId = new String(textent.getTable(), UTF_8);
       if (!security.canScan(credentials, tableId, Tables.getNamespaceId(getInstance(), tableId), range, columns, ssiList, ssio, authorizations))
@@ -474,9 +475,10 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
       for (TColumn tcolumn : columns) {
         columnSet.add(new Column(tcolumn));
       }
-      final ScanSession scanSession = new ScanSession(credentials, extent, columnSet, ssiList, ssio, new Authorizations(authorizations), readaheadThreshold);
+      final ScanSession scanSession = new ScanSession(credentials, extent, columnSet, ssiList, ssio, new Authorizations(authorizations), readaheadThreshold,
+          batchTimeOut);
       scanSession.scanner = tablet.createScanner(new Range(range), batchSize, scanSession.columnSet, scanSession.auths, ssiList, ssio, isolated,
-          scanSession.interruptFlag);
+          scanSession.interruptFlag, scanSession.batchTimeOut);
 
       long sid = sessionManager.createSession(scanSession, true);
 
@@ -588,7 +590,8 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
 
     @Override
     public InitialMultiScan startMultiScan(TInfo tinfo, TCredentials credentials, Map<TKeyExtent,List<TRange>> tbatch, List<TColumn> tcolumns,
-        List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites) throws ThriftSecurityException {
+        List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, long batchTimeOut)
+        throws ThriftSecurityException {
       // find all of the tables that need to be scanned
       final HashSet<String> tables = new HashSet<String>();
       for (TKeyExtent keyExtent : tbatch.keySet()) {
@@ -619,7 +622,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
       if (waitForWrites)
         writeTracker.waitForWrites(TabletType.type(batch.keySet()));
 
-      final MultiScanSession mss = new MultiScanSession(credentials, threadPoolExtent, batch, ssiList, ssio, new Authorizations(authorizations));
+      final MultiScanSession mss = new MultiScanSession(credentials, threadPoolExtent, batch, ssiList, ssio, new Authorizations(authorizations), batchTimeOut);
 
       mss.numTablets = batch.size();
       for (List<Range> ranges : batch.values()) {
@@ -1108,7 +1111,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
 
         IterConfig ic = compressedIters.decompress(tc.iterators);
 
-        Scanner scanner = tablet.createScanner(range, 1, EMPTY_COLUMNS, cs.auths, ic.ssiList, ic.ssio, false, cs.interruptFlag);
+        Scanner scanner = tablet.createScanner(range, 1, EMPTY_COLUMNS, cs.auths, ic.ssiList, ic.ssio, false, cs.interruptFlag, 0);
 
         try {
           ScanBatch batch = scanner.read();

http://git-wip-us.apache.org/repos/asf/accumulo/blob/81994647/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java
index 08597f4..57a09ce 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java
@@ -111,7 +111,7 @@ public class LookupTask extends ScanTask<MultiScanResult> {
             interruptFlag.set(true);
 
           lookupResult = tablet.lookup(entry.getValue(), session.columnSet, session.auths, results, maxResultsSize - bytesAdded, session.ssiList, session.ssio,
-              interruptFlag);
+              interruptFlag, session.batchTimeOut);
 
           // if the tablet was closed it it possible that the
           // interrupt flag was set.... do not want it set for

http://git-wip-us.apache.org/repos/asf/accumulo/blob/81994647/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java
index b326e10..fccac47 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java
@@ -36,6 +36,7 @@ public class MultiScanSession extends Session {
   public final List<IterInfo> ssiList;
   public final Map<String,Map<String,String>> ssio;
   public final Authorizations auths;
+  public final long batchTimeOut;
 
   // stats
   public int numRanges;
@@ -46,13 +47,14 @@ public class MultiScanSession extends Session {
   public volatile ScanTask<MultiScanResult> lookupTask;
 
   public MultiScanSession(TCredentials credentials, KeyExtent threadPoolExtent, Map<KeyExtent,List<Range>> queries, List<IterInfo> ssiList,
-      Map<String,Map<String,String>> ssio, Authorizations authorizations) {
+      Map<String,Map<String,String>> ssio, Authorizations authorizations, long batchTimeOut) {
     super(credentials);
     this.queries = queries;
     this.ssiList = ssiList;
     this.ssio = ssio;
     this.auths = authorizations;
     this.threadPoolExtent = threadPoolExtent;
+    this.batchTimeOut = batchTimeOut;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/accumulo/blob/81994647/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java
index d5b0027..7a1d400 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java
@@ -44,9 +44,10 @@ public class ScanSession extends Session {
   public volatile ScanTask<ScanBatch> nextBatchTask;
   public Scanner scanner;
   public final long readaheadThreshold;
+  public final long batchTimeOut;
 
   public ScanSession(TCredentials credentials, KeyExtent extent, Set<Column> columnSet, List<IterInfo> ssiList, Map<String,Map<String,String>> ssio,
-      Authorizations authorizations, long readaheadThreshold) {
+      Authorizations authorizations, long readaheadThreshold, long batchTimeOut) {
     super(credentials);
     this.extent = extent;
     this.columnSet = columnSet;
@@ -54,6 +55,7 @@ public class ScanSession extends Session {
     this.ssio = ssio;
     this.auths = authorizations;
     this.readaheadThreshold = readaheadThreshold;
+    this.batchTimeOut = batchTimeOut;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/accumulo/blob/81994647/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java
index 33277bd..853714a 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java
@@ -65,10 +65,10 @@ class ScanDataSource implements DataSource {
   private final ScanOptions options;
 
   ScanDataSource(Tablet tablet, Authorizations authorizations, byte[] defaultLabels, HashSet<Column> columnSet, List<IterInfo> ssiList,
-      Map<String,Map<String,String>> ssio, AtomicBoolean interruptFlag) {
+      Map<String,Map<String,String>> ssio, AtomicBoolean interruptFlag, long batchTimeOut) {
     this.tablet = tablet;
     expectedDeletionCount = tablet.getDataSourceDeletions();
-    this.options = new ScanOptions(-1, authorizations, defaultLabels, columnSet, ssiList, ssio, interruptFlag, false);
+    this.options = new ScanOptions(-1, authorizations, defaultLabels, columnSet, ssiList, ssio, interruptFlag, false, batchTimeOut);
     this.interruptFlag = interruptFlag;
   }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/81994647/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanOptions.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanOptions.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanOptions.java
index 93e8eee..2a38fbd 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanOptions.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanOptions.java
@@ -35,9 +35,10 @@ final class ScanOptions {
   private final AtomicBoolean interruptFlag;
   private final int num;
   private final boolean isolated;
+  private final long batchTimeOut;
 
   ScanOptions(int num, Authorizations authorizations, byte[] defaultLabels, Set<Column> columnSet, List<IterInfo> ssiList, Map<String,Map<String,String>> ssio,
-      AtomicBoolean interruptFlag, boolean isolated) {
+      AtomicBoolean interruptFlag, boolean isolated, long batchTimeOut) {
     this.num = num;
     this.authorizations = authorizations;
     this.defaultLabels = defaultLabels;
@@ -46,6 +47,7 @@ final class ScanOptions {
     this.ssio = ssio;
     this.interruptFlag = interruptFlag;
     this.isolated = isolated;
+    this.batchTimeOut = batchTimeOut;
   }
 
   public Authorizations getAuthorizations() {
@@ -79,4 +81,8 @@ final class ScanOptions {
   public boolean isIsolated() {
     return isolated;
   }
+
+  public long getBatchTimeOut() {
+    return batchTimeOut;
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/81994647/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java
index 790a352..3ce10d1 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java
@@ -79,7 +79,7 @@ public class Scanner {
         iter = new SourceSwitchingIterator(dataSource, false);
       }
 
-      results = tablet.nextBatch(iter, range, options.getNum(), options.getColumnSet());
+      results = tablet.nextBatch(iter, range, options.getNum(), options.getColumnSet(), options.getBatchTimeOut());
 
       if (results.getResults() == null) {
         range = null;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/81994647/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index f131372..4930219 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -42,6 +42,7 @@ import java.util.TreeMap;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -718,7 +719,7 @@ public class Tablet implements TabletCommitter {
     }
   }
 
-  private LookupResult lookup(SortedKeyValueIterator<Key,Value> mmfi, List<Range> ranges, HashSet<Column> columnSet, List<KVEntry> results, long maxResultsSize)
+  private LookupResult lookup(SortedKeyValueIterator<Key,Value> mmfi, List<Range> ranges, HashSet<Column> columnSet, List<KVEntry> results, long maxResultsSize, long batchTimeOut)
       throws IOException {
 
     LookupResult lookupResult = new LookupResult();
@@ -730,9 +731,16 @@ public class Tablet implements TabletCommitter {
     if (columnSet.size() > 0)
       cfset = LocalityGroupUtil.families(columnSet);
 
+    long returnTime = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(batchTimeOut);
+    if (batchTimeOut <= 0 || batchTimeOut == Long.MAX_VALUE) {
+      batchTimeOut = 0;
+    }
+
     for (Range range : ranges) {
 
-      if (exceededMemoryUsage || tabletClosed) {
+      boolean timesUp = batchTimeOut > 0 && System.nanoTime() > returnTime;
+
+      if (exceededMemoryUsage || tabletClosed || timesUp) {
         lookupResult.unfinishedRanges.add(range);
         continue;
       }
@@ -756,7 +764,9 @@ public class Tablet implements TabletCommitter {
 
           exceededMemoryUsage = lookupResult.bytesAdded > maxResultsSize;
 
-          if (exceededMemoryUsage) {
+          timesUp = batchTimeOut > 0 && System.nanoTime() > returnTime;
+
+          if (exceededMemoryUsage || timesUp) {
             addUnfinishedRange(lookupResult, range, key, false);
             break;
           }
@@ -815,7 +825,7 @@ public class Tablet implements TabletCommitter {
   }
 
   public LookupResult lookup(List<Range> ranges, HashSet<Column> columns, Authorizations authorizations, List<KVEntry> results, long maxResultSize,
-      List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, AtomicBoolean interruptFlag) throws IOException {
+      List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, AtomicBoolean interruptFlag, long batchTimeOut) throws IOException {
 
     if (ranges.size() == 0) {
       return new LookupResult();
@@ -833,13 +843,13 @@ public class Tablet implements TabletCommitter {
       tabletRange.clip(range);
     }
 
-    ScanDataSource dataSource = new ScanDataSource(this, authorizations, this.defaultSecurityLabel, columns, ssiList, ssio, interruptFlag);
+    ScanDataSource dataSource = new ScanDataSource(this, authorizations, this.defaultSecurityLabel, columns, ssiList, ssio, interruptFlag, batchTimeOut);
 
     LookupResult result = null;
 
     try {
       SortedKeyValueIterator<Key,Value> iter = new SourceSwitchingIterator(dataSource);
-      result = lookup(iter, ranges, columns, results, maxResultSize);
+      result = lookup(iter, ranges, columns, results, maxResultSize, batchTimeOut);
       return result;
     } catch (IOException ioe) {
       dataSource.close(true);
@@ -857,10 +867,14 @@ public class Tablet implements TabletCommitter {
     }
   }
 
-  Batch nextBatch(SortedKeyValueIterator<Key,Value> iter, Range range, int num, Set<Column> columns) throws IOException {
+  Batch nextBatch(SortedKeyValueIterator<Key,Value> iter, Range range, int num, Set<Column> columns, long batchTimeOut) throws IOException {
 
     // log.info("In nextBatch..");
 
+    long stopTime = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(batchTimeOut);
+    if (batchTimeOut == Long.MAX_VALUE || batchTimeOut <= 0) {
+      batchTimeOut = 0;
+    }
     List<KVEntry> results = new ArrayList<KVEntry>();
     Key key = null;
 
@@ -890,7 +904,9 @@ public class Tablet implements TabletCommitter {
       resultSize += kvEntry.estimateMemoryUsed();
       resultBytes += kvEntry.numBytes();
 
-      if (resultSize >= maxResultsSize || results.size() >= num) {
+      boolean timesUp = batchTimeOut > 0 && System.nanoTime() >= stopTime;
+
+      if (resultSize >= maxResultsSize || results.size() >= num || timesUp) {
         continueKey = new Key(key);
         skipContinueKey = true;
         break;
@@ -931,12 +947,12 @@ public class Tablet implements TabletCommitter {
   }
 
   public Scanner createScanner(Range range, int num, Set<Column> columns, Authorizations authorizations, List<IterInfo> ssiList,
-      Map<String,Map<String,String>> ssio, boolean isolated, AtomicBoolean interruptFlag) {
+      Map<String,Map<String,String>> ssio, boolean isolated, AtomicBoolean interruptFlag, long batchTimeOut) {
     // do a test to see if this range falls within the tablet, if it does not
     // then clip will throw an exception
     extent.toDataRange().clip(range);
 
-    ScanOptions opts = new ScanOptions(num, authorizations, this.defaultSecurityLabel, columns, ssiList, ssio, interruptFlag, isolated);
+    ScanOptions opts = new ScanOptions(num, authorizations, this.defaultSecurityLabel, columns, ssiList, ssio, interruptFlag, isolated, batchTimeOut);
     return new Scanner(this, range, opts);
   }
 


Mime
View raw message