accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [1/3] git commit: ACCUMULO-1566 Lift out the implicit "3 batch" convention from ScannerIterator into Scanner so it can be configured by the user.
Date Tue, 08 Oct 2013 17:24:06 GMT
Updated Branches:
  refs/heads/master 24b44f947 -> dab1be962


ACCUMULO-1566 Lift out the implicit "3 batch" convention from ScannerIterator
into Scanner so it can be configured by the user.

The ScannerIterator previously started pre-fetching the next batch after the
previous was returned, only after three batches are returned. Clients have the
ability to know how to control this better, and, as such, we should let them.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/0d85d60c
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/0d85d60c
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/0d85d60c

Branch: refs/heads/master
Commit: 0d85d60c08f88bc6d3e366b192fba5a371654363
Parents: 24b44f9
Author: Josh Elser <elserj@apache.org>
Authored: Mon Oct 7 23:28:36 2013 -0400
Committer: Josh Elser <elserj@apache.org>
Committed: Mon Oct 7 23:28:36 2013 -0400

----------------------------------------------------------------------
 .../org/apache/accumulo/core/Constants.java     |  4 ++++
 .../core/client/ClientSideIteratorScanner.java  | 17 ++++++++++++++
 .../accumulo/core/client/IsolatedScanner.java   | 24 ++++++++++++++++++--
 .../apache/accumulo/core/client/Scanner.java    | 14 ++++++++++++
 .../core/client/impl/OfflineScanner.java        | 10 ++++++++
 .../accumulo/core/client/impl/ScannerImpl.java  | 17 +++++++++++++-
 .../core/client/impl/ScannerIterator.java       | 18 ++++++++++++---
 .../accumulo/core/client/mock/MockScanner.java  | 10 ++++++++
 .../monitor/servlets/trace/NullScanner.java     | 10 ++++++++
 .../server/util/OfflineMetadataScanner.java     | 10 ++++++++
 10 files changed, 128 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/0d85d60c/core/src/main/java/org/apache/accumulo/core/Constants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/Constants.java b/core/src/main/java/org/apache/accumulo/core/Constants.java
index ebcc47b..3b4c1e6 100644
--- a/core/src/main/java/org/apache/accumulo/core/Constants.java
+++ b/core/src/main/java/org/apache/accumulo/core/Constants.java
@@ -81,6 +81,10 @@ public class Constants {
   // this affects the table client caching of metadata
   public static final int SCAN_BATCH_SIZE = 1000;
   
+  // Scanners will default to fetching 3 batches of Key/Value pairs before asynchronously
+  // fetching the next batch.
+  public static final long SCANNER_DEFAULT_READAHEAD_THRESHOLD = 3l;
+  
   // Security configuration
   public static final String PW_HASH_ALGORITHM = "SHA-256";
   

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0d85d60c/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java
b/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java
index 3085f56..168e56f 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java
@@ -26,6 +26,7 @@ import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.impl.ScannerOptions;
 import org.apache.accumulo.core.client.mock.IteratorAdapter;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
@@ -57,6 +58,7 @@ public class ClientSideIteratorScanner extends ScannerOptions implements
Scanner
   
   private Range range;
   private boolean isolated = false;
+  private long readaheadThreshold = Constants.SCANNER_DEFAULT_READAHEAD_THRESHOLD;
   
   /**
    * A class that wraps a Scanner in a SortedKeyValueIterator so that other accumulo iterators
can use it as a source.
@@ -137,6 +139,7 @@ public class ClientSideIteratorScanner extends ScannerOptions implements
Scanner
     this.range = scanner.getRange();
     this.size = scanner.getBatchSize();
     this.timeOut = scanner.getTimeout(TimeUnit.MILLISECONDS);
+    this.readaheadThreshold = scanner.getReadaheadThreshold();
   }
   
   /**
@@ -152,6 +155,7 @@ public class ClientSideIteratorScanner extends ScannerOptions implements
Scanner
   public Iterator<Entry<Key,Value>> iterator() {
     smi.scanner.setBatchSize(size);
     smi.scanner.setTimeout(timeOut, TimeUnit.MILLISECONDS);
+    smi.scanner.setReadaheadThreshold(readaheadThreshold);
     if (isolated)
       smi.scanner.enableIsolation();
     else
@@ -254,4 +258,17 @@ public class ClientSideIteratorScanner extends ScannerOptions implements
Scanner
   public void disableIsolation() {
     this.isolated = false;
   }
+
+  @Override
+  public long getReadaheadThreshold() {
+    return readaheadThreshold;
+  }
+
+  @Override
+  public void setReadaheadThreshold(long batches) {
+    if (0 > batches) {
+      throw new IllegalArgumentException("Number of batches before read-ahead must be non-negative");
+    }
+    this.readaheadThreshold = batches;
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0d85d60c/core/src/main/java/org/apache/accumulo/core/client/IsolatedScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/IsolatedScanner.java b/core/src/main/java/org/apache/accumulo/core/client/IsolatedScanner.java
index 4b362dc..4acea76 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/IsolatedScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/IsolatedScanner.java
@@ -21,6 +21,7 @@ import java.util.Iterator;
 import java.util.Map.Entry;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.impl.IsolationException;
 import org.apache.accumulo.core.client.impl.ScannerOptions;
 import org.apache.accumulo.core.data.ByteSequence;
@@ -52,6 +53,7 @@ public class IsolatedScanner extends ScannerOptions implements Scanner {
     private ScannerOptions opts;
     private Range range;
     private int batchSize;
+    private long readaheadThreshold;
     
     private void readRow() {
       
@@ -123,6 +125,7 @@ public class IsolatedScanner extends ScannerOptions implements Scanner
{
         scanner.setBatchSize(batchSize);
         scanner.setTimeout(timeout, TimeUnit.MILLISECONDS);
         scanner.setRange(r);
+        scanner.setReadaheadThreshold(readaheadThreshold);
         setOptions((ScannerOptions) scanner, opts);
         
         return scanner.iterator();
@@ -130,12 +133,13 @@ public class IsolatedScanner extends ScannerOptions implements Scanner
{
       }
     }
     
-    public RowBufferingIterator(Scanner scanner, ScannerOptions opts, Range range, long timeout,
int batchSize, RowBufferFactory bufferFactory) {
+    public RowBufferingIterator(Scanner scanner, ScannerOptions opts, Range range, long timeout,
int batchSize, long readaheadThreshold, RowBufferFactory bufferFactory) {
       this.scanner = scanner;
       this.opts = new ScannerOptions(opts);
       this.range = range;
       this.timeout = timeout;
       this.batchSize = batchSize;
+      this.readaheadThreshold = readaheadThreshold;
       
       buffer = bufferFactory.newBuffer();
       
@@ -211,6 +215,7 @@ public class IsolatedScanner extends ScannerOptions implements Scanner
{
   private Scanner scanner;
   private Range range;
   private int batchSize;
+  private long readaheadThreshold;
   private RowBufferFactory bufferFactory;
   
   public IsolatedScanner(Scanner scanner) {
@@ -222,12 +227,13 @@ public class IsolatedScanner extends ScannerOptions implements Scanner
{
     this.range = scanner.getRange();
     this.timeOut = scanner.getTimeout(TimeUnit.MILLISECONDS);
     this.batchSize = scanner.getBatchSize();
+    this.readaheadThreshold = scanner.getReadaheadThreshold();
     this.bufferFactory = bufferFactory;
   }
   
   @Override
   public Iterator<Entry<Key,Value>> iterator() {
-    return new RowBufferingIterator(scanner, this, range, timeOut, batchSize, bufferFactory);
+    return new RowBufferingIterator(scanner, this, range, timeOut, batchSize, readaheadThreshold,
bufferFactory);
   }
   
   @Deprecated
@@ -278,4 +284,18 @@ public class IsolatedScanner extends ScannerOptions implements Scanner
{
   public void disableIsolation() {
     throw new UnsupportedOperationException();
   }
+
+  @Override
+  public long getReadaheadThreshold() {
+    return readaheadThreshold;
+  }
+
+  @Override
+  public void setReadaheadThreshold(long batches) {
+    if (0 > batches) {
+      throw new IllegalArgumentException("Number of batches before read-ahead must be non-negative");
+    }
+    
+    this.readaheadThreshold = batches;
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0d85d60c/core/src/main/java/org/apache/accumulo/core/client/Scanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/Scanner.java b/core/src/main/java/org/apache/accumulo/core/client/Scanner.java
index fe9edda..245aa18 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/Scanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/Scanner.java
@@ -85,4 +85,18 @@ public interface Scanner extends ScannerBase {
    * Disables row isolation. Writes that occur to a row after a scan of that row has begun
may be seen if this option is enabled.
    */
   void disableIsolation();
+  
+  /**
+   * The number of batches of Key/Value pairs returned before the {@link Scanner} will begin
to prefetch the next batch
+   * @return Number of batches before read-ahead begins
+   * @since 1.6.0
+   */
+  public long getReadaheadThreshold();
+  
+  /**
+   * Sets the number of batches of Key/Value pairs returned before the {@link Scanner} will
begin to prefetch the next batch
+   * @param batches Non-negative number of batches
+   * @since 1.6.0
+   */
+  public void setReadaheadThreshold(long batches); 
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0d85d60c/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java b/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java
index 385a1cc..9f6f3cd 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java
@@ -413,5 +413,15 @@ public class OfflineScanner extends ScannerOptions implements Scanner
{
   public Iterator<Entry<Key,Value>> iterator() {
     return new OfflineIterator(this, instance, credentials, authorizations, tableId, range);
   }
+
+  @Override
+  public long getReadaheadThreshold() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setReadaheadThreshold(long batches) {
+    throw new UnsupportedOperationException();
+  }
   
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0d85d60c/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerImpl.java
index 5f845cc..6be55b6 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerImpl.java
@@ -55,6 +55,7 @@ public class ScannerImpl extends ScannerOptions implements Scanner {
   
   private Range range;
   private boolean isolated = false;
+  private long readaheadThreshold = Constants.SCANNER_DEFAULT_READAHEAD_THRESHOLD;
   
   public ScannerImpl(Instance instance, Credentials credentials, String table, Authorizations
authorizations) {
     ArgumentChecker.notNull(instance, credentials, table, authorizations);
@@ -97,7 +98,7 @@ public class ScannerImpl extends ScannerOptions implements Scanner {
    */
   @Override
   public synchronized Iterator<Entry<Key,Value>> iterator() {
-    return new ScannerIterator(instance, credentials, table, authorizations, range, size,
getTimeOut(), this, isolated);
+    return new ScannerIterator(instance, credentials, table, authorizations, range, size,
getTimeOut(), this, isolated, readaheadThreshold);
   }
   
   @Override
@@ -127,4 +128,18 @@ public class ScannerImpl extends ScannerOptions implements Scanner {
       return Integer.MAX_VALUE;
     return (int) timeout;
   }
+  
+  @Override
+  public synchronized void setReadaheadThreshold(long batches) {
+    if (0 > batches) {
+      throw new IllegalArgumentException("Number of batches before read-ahead must be non-negative");
+    }
+    
+    readaheadThreshold = batches;
+  }
+  
+  @Override
+  public synchronized long getReadaheadThreshold() {
+    return readaheadThreshold;
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0d85d60c/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerIterator.java
b/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerIterator.java
index f81759d..e9d1412 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerIterator.java
@@ -26,6 +26,7 @@ import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.Instance;
@@ -64,8 +65,9 @@ public class ScannerIterator implements Iterator<Entry<Key,Value>>
{
   
   private boolean finished = false;
   
-  private boolean readaheadInProgress;
+  private boolean readaheadInProgress = false;
   private long batchCount = 0;
+  private long readaheadThreshold;
   
   private static final List<KeyValue> EMPTY_LIST = Collections.emptyList();
   
@@ -123,10 +125,16 @@ public class ScannerIterator implements Iterator<Entry<Key,Value>>
{
   
   ScannerIterator(Instance instance, Credentials credentials, Text table, Authorizations
authorizations, Range range, int size, int timeOut,
       ScannerOptions options, boolean isolated) {
+    this(instance, credentials, table, authorizations, range, size, timeOut, options, isolated,
Constants.SCANNER_DEFAULT_READAHEAD_THRESHOLD);
+  }
+  
+  ScannerIterator(Instance instance, Credentials credentials, Text table, Authorizations
authorizations, Range range, int size, int timeOut,
+      ScannerOptions options, boolean isolated, long readaheadThreshold) {
     this.instance = instance;
     this.tableId = new Text(table);
     this.timeOut = timeOut;
     this.credentials = credentials;
+    this.readaheadThreshold = readaheadThreshold;
     
     this.options = new ScannerOptions(options);
     
@@ -138,7 +146,11 @@ public class ScannerIterator implements Iterator<Entry<Key,Value>>
{
     
     scanState = new ScanState(instance, credentials, tableId, authorizations, new Range(range),
options.fetchedColumns, size, options.serverSideIteratorList,
         options.serverSideIteratorOptions, isolated);
-    readaheadInProgress = false;
+    
+    // If we want to start readahead immediately, don't wait for hasNext to be called
+    if (0l == readaheadThreshold) {
+      initiateReadAhead();
+    }
     iter = null;
   }
   
@@ -185,7 +197,7 @@ public class ScannerIterator implements Iterator<Entry<Key,Value>>
{
       iter = currentBatch.iterator();
       batchCount++;
       
-      if (batchCount > 3) {
+      if (batchCount > readaheadThreshold) {
         // start a thread to read the next batch
         initiateReadAhead();
       }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0d85d60c/core/src/main/java/org/apache/accumulo/core/client/mock/MockScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mock/MockScanner.java b/core/src/main/java/org/apache/accumulo/core/client/mock/MockScanner.java
index 002dbfc..e7c0ee0 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mock/MockScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mock/MockScanner.java
@@ -109,5 +109,15 @@ public class MockScanner extends MockScannerBase implements Scanner {
     }
     
   }
+
+  @Override
+  public long getReadaheadThreshold() {
+    return 0;
+  }
+
+  @Override
+  public void setReadaheadThreshold(long batches) {
+    
+  }
   
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0d85d60c/server/src/main/java/org/apache/accumulo/server/monitor/servlets/trace/NullScanner.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/monitor/servlets/trace/NullScanner.java
b/server/src/main/java/org/apache/accumulo/server/monitor/servlets/trace/NullScanner.java
index 11d20c0..6ee16b1 100644
--- a/server/src/main/java/org/apache/accumulo/server/monitor/servlets/trace/NullScanner.java
+++ b/server/src/main/java/org/apache/accumulo/server/monitor/servlets/trace/NullScanner.java
@@ -103,4 +103,14 @@ public class NullScanner implements Scanner {
   
   @Override
   public void close() {}
+
+  @Override
+  public long getReadaheadThreshold() {
+    return 0l;
+  }
+
+  @Override
+  public void setReadaheadThreshold(long batches) {
+    
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0d85d60c/server/src/main/java/org/apache/accumulo/server/util/OfflineMetadataScanner.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/util/OfflineMetadataScanner.java
b/server/src/main/java/org/apache/accumulo/server/util/OfflineMetadataScanner.java
index 5e82ada..3fee062 100644
--- a/server/src/main/java/org/apache/accumulo/server/util/OfflineMetadataScanner.java
+++ b/server/src/main/java/org/apache/accumulo/server/util/OfflineMetadataScanner.java
@@ -268,5 +268,15 @@ public class OfflineMetadataScanner extends ScannerOptions implements
Scanner {
     for (Entry<Key,Value> entry : scanner)
       System.out.println(entry.getKey() + " " + entry.getValue());
   }
+
+  @Override
+  public long getReadaheadThreshold() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setReadaheadThreshold(long batches) {
+    throw new UnsupportedOperationException();
+  }
   
 }


Mime
View raw message