accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject git commit: ACCUMULO-1566 Pass down the readaheadThreshold parameter from the client to the server so that the same limit is adhered to by the server in regards to pipelining.
Date Tue, 08 Oct 2013 22:57:51 GMT
Updated Branches:
  refs/heads/ACCUMULO-1566 [created] ff95c7147


ACCUMULO-1566 Pass down the readaheadThreshold parameter from the client to the
server so that the same limit is adhered to by the server in regards to
pipelining.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/ff95c714
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/ff95c714
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/ff95c714

Branch: refs/heads/ACCUMULO-1566
Commit: ff95c7147d210171fef3824eae399b7384cdaff9
Parents: e70a40d
Author: Josh Elser <elserj@apache.org>
Authored: Tue Oct 8 18:54:30 2013 -0400
Committer: Josh Elser <elserj@apache.org>
Committed: Tue Oct 8 18:54:30 2013 -0400

----------------------------------------------------------------------
 .../core/client/impl/ThriftScanner.java         |  15 ++-
 .../thrift/TabletClientService.java             | 124 +++++++++++++++++--
 core/src/main/thrift/tabletserver.thrift        |   3 +-
 .../server/tabletserver/TabletServer.java       |  10 +-
 .../test/performance/thrift/NullTserver.java    |   2 +-
 5 files changed, 132 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/ff95c714/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java
index efdd142..efb31e8 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java
@@ -27,6 +27,7 @@ import java.util.Set;
 import java.util.SortedMap;
 import java.util.SortedSet;
 
+import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.Instance;
@@ -97,7 +98,7 @@ public class ThriftScanner {
         boolean waitForWrites = !serversWaitedForWrites.get(ttype).contains(server);
         InitialScan isr = client.startScan(tinfo, scanState.credentials.toThrift(instance),
extent.toThrift(), scanState.range.toThrift(),
             Translator.translate(scanState.columns, Translator.CT), scanState.size, scanState.serverSideIteratorList,
scanState.serverSideIteratorOptions,
-            scanState.authorizations.getAuthorizationsBB(), waitForWrites, scanState.isolated);
+            scanState.authorizations.getAuthorizationsBB(), waitForWrites, scanState.isolated,
scanState.readaheadThreshold);
         if (waitForWrites)
           serversWaitedForWrites.get(ttype).add(server);
         
@@ -132,6 +133,7 @@ public class ThriftScanner {
     Text tableId;
     Text startRow;
     boolean skipStartRow;
+    long readaheadThreshold;
     
     Range range;
     
@@ -150,9 +152,15 @@ public class ThriftScanner {
     List<IterInfo> serverSideIteratorList;
     
     Map<String,Map<String,String>> serverSideIteratorOptions;
-    
+
     public ScanState(Instance instance, Credentials credentials, Text tableId, Authorizations
authorizations, Range range, SortedSet<Column> fetchedColumns,
         int size, List<IterInfo> serverSideIteratorList, Map<String,Map<String,String>>
serverSideIteratorOptions, boolean isolated) {
+      this(instance, credentials, tableId, authorizations, range, fetchedColumns, size, serverSideIteratorList,
serverSideIteratorOptions, isolated,
+          Constants.SCANNER_DEFAULT_READAHEAD_THRESHOLD);
+    }
+
+    public ScanState(Instance instance, Credentials credentials, Text tableId, Authorizations
authorizations, Range range, SortedSet<Column> fetchedColumns,
+        int size, List<IterInfo> serverSideIteratorList, Map<String,Map<String,String>>
serverSideIteratorOptions, boolean isolated, long readaheadThreshold) {
       this.instance = instance;
       this.credentials = credentials;
       this.authorizations = authorizations;
@@ -179,6 +187,7 @@ public class ThriftScanner {
       this.serverSideIteratorOptions = serverSideIteratorOptions;
       
       this.isolated = isolated;
+      this.readaheadThreshold = readaheadThreshold;
       
     }
   }
@@ -389,7 +398,7 @@ public class ThriftScanner {
         boolean waitForWrites = !serversWaitedForWrites.get(ttype).contains(loc.tablet_location);
         InitialScan is = client.startScan(tinfo, scanState.credentials.toThrift(scanState.instance),
loc.tablet_extent.toThrift(), scanState.range.toThrift(),
             Translator.translate(scanState.columns, Translator.CT), scanState.size, scanState.serverSideIteratorList,
scanState.serverSideIteratorOptions,
-            scanState.authorizations.getAuthorizationsBB(), waitForWrites, scanState.isolated);
+            scanState.authorizations.getAuthorizationsBB(), waitForWrites, scanState.isolated,
scanState.readaheadThreshold);
         if (waitForWrites)
           serversWaitedForWrites.get(ttype).add(loc.tablet_location);
         

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ff95c714/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/TabletClientService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/TabletClientService.java
b/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/TabletClientService.java
index bd6578d..d02b5da 100644
--- a/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/TabletClientService.java
+++ b/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/TabletClientService.java
@@ -50,7 +50,7 @@ import org.slf4j.LoggerFactory;
 
   public interface Iface extends org.apache.accumulo.core.client.impl.thrift.ClientService.Iface
{
 
-    public org.apache.accumulo.core.data.thrift.InitialScan startScan(org.apache.accumulo.trace.thrift.TInfo
tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.accumulo.core.data.thrift.TKeyExtent
extent, org.apache.accumulo.core.data.thrift.TRange range, List<org.apache.accumulo.core.data.thrift.TColumn>
columns, int batchSize, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList,
Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean
waitForWrites, boolean isolated) throws org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException,
NotServingTabletException, TooManyFilesException, org.apache.thrift.TException;
+    public org.apache.accumulo.core.data.thrift.InitialScan startScan(org.apache.accumulo.trace.thrift.TInfo
tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.accumulo.core.data.thrift.TKeyExtent
extent, org.apache.accumulo.core.data.thrift.TRange range, List<org.apache.accumulo.core.data.thrift.TColumn>
columns, int batchSize, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList,
Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean
waitForWrites, boolean isolated, long readaheadThreshold) throws org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException,
NotServingTabletException, TooManyFilesException, org.apache.thrift.TException;
 
     public org.apache.accumulo.core.data.thrift.ScanResult continueScan(org.apache.accumulo.trace.thrift.TInfo
tinfo, long scanID) throws NoSuchScanIDException, NotServingTabletException, TooManyFilesException,
org.apache.thrift.TException;
 
@@ -114,7 +114,7 @@ import org.slf4j.LoggerFactory;
 
   public interface AsyncIface extends org.apache.accumulo.core.client.impl.thrift.ClientService
.AsyncIface {
 
-    public void startScan(org.apache.accumulo.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials
credentials, org.apache.accumulo.core.data.thrift.TKeyExtent extent, org.apache.accumulo.core.data.thrift.TRange
range, List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, List<org.apache.accumulo.core.data.thrift.IterInfo>
ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations,
boolean waitForWrites, boolean isolated, org.apache.thrift.async.AsyncMethodCallback<AsyncClient.startScan_call>
resultHandler) throws org.apache.thrift.TException;
+    public void startScan(org.apache.accumulo.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials
credentials, org.apache.accumulo.core.data.thrift.TKeyExtent extent, org.apache.accumulo.core.data.thrift.TRange
range, List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, List<org.apache.accumulo.core.data.thrift.IterInfo>
ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations,
boolean waitForWrites, boolean isolated, long readaheadThreshold, org.apache.thrift.async.AsyncMethodCallback<AsyncClient.startScan_call>
resultHandler) throws org.apache.thrift.TException;
 
     public void continueScan(org.apache.accumulo.trace.thrift.TInfo tinfo, long scanID, org.apache.thrift.async.AsyncMethodCallback<AsyncClient.continueScan_call>
resultHandler) throws org.apache.thrift.TException;
 
@@ -196,13 +196,13 @@ import org.slf4j.LoggerFactory;
       super(iprot, oprot);
     }
 
-    public org.apache.accumulo.core.data.thrift.InitialScan startScan(org.apache.accumulo.trace.thrift.TInfo
tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.accumulo.core.data.thrift.TKeyExtent
extent, org.apache.accumulo.core.data.thrift.TRange range, List<org.apache.accumulo.core.data.thrift.TColumn>
columns, int batchSize, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList,
Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean
waitForWrites, boolean isolated) throws org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException,
NotServingTabletException, TooManyFilesException, org.apache.thrift.TException
+    public org.apache.accumulo.core.data.thrift.InitialScan startScan(org.apache.accumulo.trace.thrift.TInfo
tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.accumulo.core.data.thrift.TKeyExtent
extent, org.apache.accumulo.core.data.thrift.TRange range, List<org.apache.accumulo.core.data.thrift.TColumn>
columns, int batchSize, List<org.apache.accumulo.core.data.thrift.IterInfo> ssiList,
Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean
waitForWrites, boolean isolated, long readaheadThreshold) throws org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException,
NotServingTabletException, TooManyFilesException, org.apache.thrift.TException
     {
-      send_startScan(tinfo, credentials, extent, range, columns, batchSize, ssiList, ssio,
authorizations, waitForWrites, isolated);
+      send_startScan(tinfo, credentials, extent, range, columns, batchSize, ssiList, ssio,
authorizations, waitForWrites, isolated, readaheadThreshold);
       return recv_startScan();
     }
 
-    public void send_startScan(org.apache.accumulo.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials
credentials, org.apache.accumulo.core.data.thrift.TKeyExtent extent, org.apache.accumulo.core.data.thrift.TRange
range, List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, List<org.apache.accumulo.core.data.thrift.IterInfo>
ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations,
boolean waitForWrites, boolean isolated) throws org.apache.thrift.TException
+    public void send_startScan(org.apache.accumulo.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials
credentials, org.apache.accumulo.core.data.thrift.TKeyExtent extent, org.apache.accumulo.core.data.thrift.TRange
range, List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, List<org.apache.accumulo.core.data.thrift.IterInfo>
ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations,
boolean waitForWrites, boolean isolated, long readaheadThreshold) throws org.apache.thrift.TException
     {
       startScan_args args = new startScan_args();
       args.setTinfo(tinfo);
@@ -216,6 +216,7 @@ import org.slf4j.LoggerFactory;
       args.setAuthorizations(authorizations);
       args.setWaitForWrites(waitForWrites);
       args.setIsolated(isolated);
+      args.setReadaheadThreshold(readaheadThreshold);
       sendBase("startScan", args);
     }
 
@@ -922,9 +923,9 @@ import org.slf4j.LoggerFactory;
       super(protocolFactory, clientManager, transport);
     }
 
-    public void startScan(org.apache.accumulo.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials
credentials, org.apache.accumulo.core.data.thrift.TKeyExtent extent, org.apache.accumulo.core.data.thrift.TRange
range, List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, List<org.apache.accumulo.core.data.thrift.IterInfo>
ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations,
boolean waitForWrites, boolean isolated, org.apache.thrift.async.AsyncMethodCallback<startScan_call>
resultHandler) throws org.apache.thrift.TException {
+    public void startScan(org.apache.accumulo.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials
credentials, org.apache.accumulo.core.data.thrift.TKeyExtent extent, org.apache.accumulo.core.data.thrift.TRange
range, List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, List<org.apache.accumulo.core.data.thrift.IterInfo>
ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations,
boolean waitForWrites, boolean isolated, long readaheadThreshold, org.apache.thrift.async.AsyncMethodCallback<startScan_call>
resultHandler) throws org.apache.thrift.TException {
       checkReady();
-      startScan_call method_call = new startScan_call(tinfo, credentials, extent, range,
columns, batchSize, ssiList, ssio, authorizations, waitForWrites, isolated, resultHandler,
this, ___protocolFactory, ___transport);
+      startScan_call method_call = new startScan_call(tinfo, credentials, extent, range,
columns, batchSize, ssiList, ssio, authorizations, waitForWrites, isolated, readaheadThreshold,
resultHandler, this, ___protocolFactory, ___transport);
       this.___currentMethod = method_call;
       ___manager.call(method_call);
     }
@@ -941,7 +942,8 @@ import org.slf4j.LoggerFactory;
       private List<ByteBuffer> authorizations;
       private boolean waitForWrites;
       private boolean isolated;
-      public startScan_call(org.apache.accumulo.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials
credentials, org.apache.accumulo.core.data.thrift.TKeyExtent extent, org.apache.accumulo.core.data.thrift.TRange
range, List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, List<org.apache.accumulo.core.data.thrift.IterInfo>
ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations,
boolean waitForWrites, boolean isolated, org.apache.thrift.async.AsyncMethodCallback<startScan_call>
resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory
protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException
{
+      private long readaheadThreshold;
+      public startScan_call(org.apache.accumulo.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials
credentials, org.apache.accumulo.core.data.thrift.TKeyExtent extent, org.apache.accumulo.core.data.thrift.TRange
range, List<org.apache.accumulo.core.data.thrift.TColumn> columns, int batchSize, List<org.apache.accumulo.core.data.thrift.IterInfo>
ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations,
boolean waitForWrites, boolean isolated, long readaheadThreshold, org.apache.thrift.async.AsyncMethodCallback<startScan_call>
resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory
protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException
{
         super(client, protocolFactory, transport, resultHandler, false);
         this.tinfo = tinfo;
         this.credentials = credentials;
@@ -954,6 +956,7 @@ import org.slf4j.LoggerFactory;
         this.authorizations = authorizations;
         this.waitForWrites = waitForWrites;
         this.isolated = isolated;
+        this.readaheadThreshold = readaheadThreshold;
       }
 
       public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException
{
@@ -970,6 +973,7 @@ import org.slf4j.LoggerFactory;
         args.setAuthorizations(authorizations);
         args.setWaitForWrites(waitForWrites);
         args.setIsolated(isolated);
+        args.setReadaheadThreshold(readaheadThreshold);
         args.write(prot);
         prot.writeMessageEnd();
       }
@@ -2170,7 +2174,7 @@ import org.slf4j.LoggerFactory;
       public startScan_result getResult(I iface, startScan_args args) throws org.apache.thrift.TException
{
         startScan_result result = new startScan_result();
         try {
-          result.success = iface.startScan(args.tinfo, args.credentials, args.extent, args.range,
args.columns, args.batchSize, args.ssiList, args.ssio, args.authorizations, args.waitForWrites,
args.isolated);
+          result.success = iface.startScan(args.tinfo, args.credentials, args.extent, args.range,
args.columns, args.batchSize, args.ssiList, args.ssio, args.authorizations, args.waitForWrites,
args.isolated, args.readaheadThreshold);
         } catch (org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException sec)
{
           result.sec = sec;
         } catch (NotServingTabletException nste) {
@@ -2846,6 +2850,7 @@ import org.slf4j.LoggerFactory;
     private static final org.apache.thrift.protocol.TField AUTHORIZATIONS_FIELD_DESC = new
org.apache.thrift.protocol.TField("authorizations", org.apache.thrift.protocol.TType.LIST,
(short)8);
     private static final org.apache.thrift.protocol.TField WAIT_FOR_WRITES_FIELD_DESC = new
org.apache.thrift.protocol.TField("waitForWrites", org.apache.thrift.protocol.TType.BOOL,
(short)9);
     private static final org.apache.thrift.protocol.TField ISOLATED_FIELD_DESC = new org.apache.thrift.protocol.TField("isolated",
org.apache.thrift.protocol.TType.BOOL, (short)10);
+    private static final org.apache.thrift.protocol.TField READAHEAD_THRESHOLD_FIELD_DESC
= new org.apache.thrift.protocol.TField("readaheadThreshold", org.apache.thrift.protocol.TType.I64,
(short)12);
 
     private static final Map<Class<? extends IScheme>, SchemeFactory> schemes
= new HashMap<Class<? extends IScheme>, SchemeFactory>();
     static {
@@ -2864,6 +2869,7 @@ import org.slf4j.LoggerFactory;
     public List<ByteBuffer> authorizations; // required
     public boolean waitForWrites; // required
     public boolean isolated; // required
+    public long readaheadThreshold; // required
 
     /** The set of fields this struct contains, along with convenience methods for finding
and manipulating them. */
     @SuppressWarnings("all") public enum _Fields implements org.apache.thrift.TFieldIdEnum
{
@@ -2877,7 +2883,8 @@ import org.slf4j.LoggerFactory;
       SSIO((short)7, "ssio"),
       AUTHORIZATIONS((short)8, "authorizations"),
       WAIT_FOR_WRITES((short)9, "waitForWrites"),
-      ISOLATED((short)10, "isolated");
+      ISOLATED((short)10, "isolated"),
+      READAHEAD_THRESHOLD((short)12, "readaheadThreshold");
 
       private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
 
@@ -2914,6 +2921,8 @@ import org.slf4j.LoggerFactory;
             return WAIT_FOR_WRITES;
           case 10: // ISOLATED
             return ISOLATED;
+          case 12: // READAHEAD_THRESHOLD
+            return READAHEAD_THRESHOLD;
           default:
             return null;
         }
@@ -2957,6 +2966,7 @@ import org.slf4j.LoggerFactory;
     private static final int __BATCHSIZE_ISSET_ID = 0;
     private static final int __WAITFORWRITES_ISSET_ID = 1;
     private static final int __ISOLATED_ISSET_ID = 2;
+    private static final int __READAHEADTHRESHOLD_ISSET_ID = 3;
     private byte __isset_bitfield = 0;
     public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
     static {
@@ -2990,6 +3000,8 @@ import org.slf4j.LoggerFactory;
           new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL)));
       tmpMap.put(_Fields.ISOLATED, new org.apache.thrift.meta_data.FieldMetaData("isolated",
org.apache.thrift.TFieldRequirementType.DEFAULT, 
           new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL)));
+      tmpMap.put(_Fields.READAHEAD_THRESHOLD, new org.apache.thrift.meta_data.FieldMetaData("readaheadThreshold",
org.apache.thrift.TFieldRequirementType.DEFAULT, 
+          new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
       metaDataMap = Collections.unmodifiableMap(tmpMap);
       org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(startScan_args.class,
metaDataMap);
     }
@@ -3008,7 +3020,8 @@ import org.slf4j.LoggerFactory;
       Map<String,Map<String,String>> ssio,
       List<ByteBuffer> authorizations,
       boolean waitForWrites,
-      boolean isolated)
+      boolean isolated,
+      long readaheadThreshold)
     {
       this();
       this.tinfo = tinfo;
@@ -3025,6 +3038,8 @@ import org.slf4j.LoggerFactory;
       setWaitForWritesIsSet(true);
       this.isolated = isolated;
       setIsolatedIsSet(true);
+      this.readaheadThreshold = readaheadThreshold;
+      setReadaheadThresholdIsSet(true);
     }
 
     /**
@@ -3096,6 +3111,7 @@ import org.slf4j.LoggerFactory;
       }
       this.waitForWrites = other.waitForWrites;
       this.isolated = other.isolated;
+      this.readaheadThreshold = other.readaheadThreshold;
     }
 
     public startScan_args deepCopy() {
@@ -3118,6 +3134,8 @@ import org.slf4j.LoggerFactory;
       this.waitForWrites = false;
       setIsolatedIsSet(false);
       this.isolated = false;
+      setReadaheadThresholdIsSet(false);
+      this.readaheadThreshold = 0;
     }
 
     public org.apache.accumulo.trace.thrift.TInfo getTinfo() {
@@ -3437,6 +3455,29 @@ import org.slf4j.LoggerFactory;
       __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __ISOLATED_ISSET_ID, value);
     }
 
+    public long getReadaheadThreshold() {
+      return this.readaheadThreshold;
+    }
+
+    public startScan_args setReadaheadThreshold(long readaheadThreshold) {
+      this.readaheadThreshold = readaheadThreshold;
+      setReadaheadThresholdIsSet(true);
+      return this;
+    }
+
+    public void unsetReadaheadThreshold() {
+      __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __READAHEADTHRESHOLD_ISSET_ID);
+    }
+
+    /** Returns true if field readaheadThreshold is set (has been assigned a value) and false
otherwise */
+    public boolean isSetReadaheadThreshold() {
+      return EncodingUtils.testBit(__isset_bitfield, __READAHEADTHRESHOLD_ISSET_ID);
+    }
+
+    public void setReadaheadThresholdIsSet(boolean value) {
+      __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __READAHEADTHRESHOLD_ISSET_ID,
value);
+    }
+
     public void setFieldValue(_Fields field, Object value) {
       switch (field) {
       case TINFO:
@@ -3527,6 +3568,14 @@ import org.slf4j.LoggerFactory;
         }
         break;
 
+      case READAHEAD_THRESHOLD:
+        if (value == null) {
+          unsetReadaheadThreshold();
+        } else {
+          setReadaheadThreshold((Long)value);
+        }
+        break;
+
       }
     }
 
@@ -3565,6 +3614,9 @@ import org.slf4j.LoggerFactory;
       case ISOLATED:
         return Boolean.valueOf(isIsolated());
 
+      case READAHEAD_THRESHOLD:
+        return Long.valueOf(getReadaheadThreshold());
+
       }
       throw new IllegalStateException();
     }
@@ -3598,6 +3650,8 @@ import org.slf4j.LoggerFactory;
         return isSetWaitForWrites();
       case ISOLATED:
         return isSetIsolated();
+      case READAHEAD_THRESHOLD:
+        return isSetReadaheadThreshold();
       }
       throw new IllegalStateException();
     }
@@ -3714,6 +3768,15 @@ import org.slf4j.LoggerFactory;
           return false;
       }
 
+      boolean this_present_readaheadThreshold = true;
+      boolean that_present_readaheadThreshold = true;
+      if (this_present_readaheadThreshold || that_present_readaheadThreshold) {
+        if (!(this_present_readaheadThreshold && that_present_readaheadThreshold))
+          return false;
+        if (this.readaheadThreshold != that.readaheadThreshold)
+          return false;
+      }
+
       return true;
     }
 
@@ -3840,6 +3903,16 @@ import org.slf4j.LoggerFactory;
           return lastComparison;
         }
       }
+      lastComparison = Boolean.valueOf(isSetReadaheadThreshold()).compareTo(typedOther.isSetReadaheadThreshold());
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+      if (isSetReadaheadThreshold()) {
+        lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.readaheadThreshold,
typedOther.readaheadThreshold);
+        if (lastComparison != 0) {
+          return lastComparison;
+        }
+      }
       return 0;
     }
 
@@ -3935,6 +4008,10 @@ import org.slf4j.LoggerFactory;
       sb.append("isolated:");
       sb.append(this.isolated);
       first = false;
+      if (!first) sb.append(", ");
+      sb.append("readaheadThreshold:");
+      sb.append(this.readaheadThreshold);
+      first = false;
       sb.append(")");
       return sb.toString();
     }
@@ -4140,6 +4217,14 @@ import org.slf4j.LoggerFactory;
                 org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
               }
               break;
+            case 12: // READAHEAD_THRESHOLD
+              if (schemeField.type == org.apache.thrift.protocol.TType.I64) {
+                struct.readaheadThreshold = iprot.readI64();
+                struct.setReadaheadThresholdIsSet(true);
+              } else { 
+                org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+              }
+              break;
             default:
               org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
           }
@@ -4241,6 +4326,9 @@ import org.slf4j.LoggerFactory;
           struct.tinfo.write(oprot);
           oprot.writeFieldEnd();
         }
+        oprot.writeFieldBegin(READAHEAD_THRESHOLD_FIELD_DESC);
+        oprot.writeI64(struct.readaheadThreshold);
+        oprot.writeFieldEnd();
         oprot.writeFieldStop();
         oprot.writeStructEnd();
       }
@@ -4292,7 +4380,10 @@ import org.slf4j.LoggerFactory;
         if (struct.isSetIsolated()) {
           optionals.set(10);
         }
-        oprot.writeBitSet(optionals, 11);
+        if (struct.isSetReadaheadThreshold()) {
+          optionals.set(11);
+        }
+        oprot.writeBitSet(optionals, 12);
         if (struct.isSetTinfo()) {
           struct.tinfo.write(oprot);
         }
@@ -4358,12 +4449,15 @@ import org.slf4j.LoggerFactory;
         if (struct.isSetIsolated()) {
           oprot.writeBool(struct.isolated);
         }
+        if (struct.isSetReadaheadThreshold()) {
+          oprot.writeI64(struct.readaheadThreshold);
+        }
       }
 
       @Override
       public void read(org.apache.thrift.protocol.TProtocol prot, startScan_args struct)
throws org.apache.thrift.TException {
         TTupleProtocol iprot = (TTupleProtocol) prot;
-        BitSet incoming = iprot.readBitSet(11);
+        BitSet incoming = iprot.readBitSet(12);
         if (incoming.get(0)) {
           struct.tinfo = new org.apache.accumulo.trace.thrift.TInfo();
           struct.tinfo.read(iprot);
@@ -4463,6 +4557,10 @@ import org.slf4j.LoggerFactory;
           struct.isolated = iprot.readBool();
           struct.setIsolatedIsSet(true);
         }
+        if (incoming.get(11)) {
+          struct.readaheadThreshold = iprot.readI64();
+          struct.setReadaheadThresholdIsSet(true);
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ff95c714/core/src/main/thrift/tabletserver.thrift
----------------------------------------------------------------------
diff --git a/core/src/main/thrift/tabletserver.thrift b/core/src/main/thrift/tabletserver.thrift
index 4f9f13a..25e0b10 100644
--- a/core/src/main/thrift/tabletserver.thrift
+++ b/core/src/main/thrift/tabletserver.thrift
@@ -139,7 +139,8 @@ service TabletClientService extends client.ClientService {
                              7:map<string, map<string, string>> ssio,
                              8:list<binary> authorizations
                              9:bool waitForWrites,
-                             10:bool isolated)  throws (1:client.ThriftSecurityException
sec, 2:NotServingTabletException nste, 3:TooManyFilesException tmfe),
+                             10:bool isolated,
+                             12:i64 readaheadThreshold)  throws (1:client.ThriftSecurityException
sec, 2:NotServingTabletException nste, 3:TooManyFilesException tmfe),
                              
   data.ScanResult continueScan(2:trace.TInfo tinfo, 1:data.ScanID scanID)  throws (1:NoSuchScanIDException
nssi, 2:NotServingTabletException nste, 3:TooManyFilesException tmfe),
   oneway void closeScan(2:trace.TInfo tinfo, 1:data.ScanID scanID),

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ff95c714/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
index abb8750..56f03af 100644
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
@@ -795,6 +795,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
     public volatile ScanTask<ScanBatch> nextBatchTask;
     public AtomicBoolean interruptFlag;
     public Scanner scanner;
+    public long readaheadThreshold = Constants.SCANNER_DEFAULT_READAHEAD_THRESHOLD;
     
     @Override
     public void cleanup() {
@@ -1156,9 +1157,9 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
     
     @Override
     public InitialScan startScan(TInfo tinfo, TCredentials credentials, TKeyExtent textent,
TRange range, List<TColumn> columns, int batchSize,
-        List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer>
authorizations, boolean waitForWrites, boolean isolated)
-        throws NotServingTabletException, ThriftSecurityException, org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException
{
-      
+        List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer>
authorizations, boolean waitForWrites, boolean isolated,
+        long readaheadThreshold) throws NotServingTabletException, ThriftSecurityException,
org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException {
+
       Authorizations userauths = null;
       if (!security.canScan(credentials, new String(textent.getTable()), range, columns,
ssiList, ssio, authorizations))
         throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);
@@ -1195,6 +1196,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
       scanSession.ssio = ssio;
       scanSession.auths = new Authorizations(authorizations);
       scanSession.interruptFlag = new AtomicBoolean();
+      scanSession.readaheadThreshold = readaheadThreshold;
       
       for (TColumn tcolumn : columns) {
         scanSession.columnSet.add(new Column(tcolumn));
@@ -1277,7 +1279,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
       
       scanSession.batchCount++;
       
-      if (scanResult.more && scanSession.batchCount > 3) {
+      if (scanResult.more && scanSession.batchCount > scanSession.readaheadThreshold)
{
         // start reading next batch while current batch is transmitted
         // to client
         scanSession.nextBatchTask = new NextBatchTask(scanID, scanSession.interruptFlag);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ff95c714/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
index 9bb7604..f4eb234 100644
--- a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
+++ b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
@@ -138,7 +138,7 @@ public class NullTserver {
     
     @Override
     public InitialScan startScan(TInfo tinfo, TCredentials credentials, TKeyExtent extent,
TRange range, List<TColumn> columns, int batchSize,
-        List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer>
authorizations, boolean waitForWrites, boolean isolated) {
+        List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer>
authorizations, boolean waitForWrites, boolean isolated, long readaheadThreshold) {
       return null;
     }
     


Mime
View raw message