accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bil...@apache.org
Subject svn commit: r1341120 - in /accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client: ./ impl/ mock/
Date Mon, 21 May 2012 17:35:40 GMT
Author: billie
Date: Mon May 21 17:35:40 2012
New Revision: 1341120

URL: http://svn.apache.org/viewvc?rev=1341120&view=rev
Log:
ACCUMULO-580 moved batch size setter and getter to ScannerOptions, made scanner wrappers (ClientSizeIteratorScanner
and IsolatedScanner) take range, batch size, and timeout parameters from the scanners that
they wrap

Modified:
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/IsolatedScanner.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/Scanner.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerImpl.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mock/MockScanner.java

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java?rev=1341120&r1=1341119&r2=1341120&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java
(original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java
Mon May 21 17:35:40 2012
@@ -25,7 +25,6 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
 
-import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.impl.ScannerOptions;
 import org.apache.accumulo.core.client.mock.IteratorAdapter;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
@@ -53,9 +52,7 @@ import org.apache.hadoop.io.Text;
  * the source scanner (which will execute server side) and to the client side scanner (which
will execute client side).
  */
 public class ClientSideIteratorScanner extends ScannerOptions implements Scanner {
-  private int size;
   private int timeOut;
-  
   private Range range;
   private boolean isolated = false;
   
@@ -134,9 +131,9 @@ public class ClientSideIteratorScanner e
    */
   public ClientSideIteratorScanner(Scanner scanner) {
     smi = new ScannerTranslator(scanner);
-    this.range = new Range((Key) null, (Key) null);
-    this.size = Constants.SCAN_BATCH_SIZE;
-    this.timeOut = Integer.MAX_VALUE;
+    setRange(scanner.getRange());
+    setBatchSize(scanner.getBatchSize());
+    setTimeOut(scanner.getTimeOut());
   }
   
   /**
@@ -150,7 +147,7 @@ public class ClientSideIteratorScanner e
   
   @Override
   public Iterator<Entry<Key,Value>> iterator() {
-    smi.scanner.setBatchSize(size);
+    smi.scanner.setBatchSize(getBatchSize());
     smi.scanner.setTimeOut(timeOut);
     if (isolated)
       smi.scanner.enableIsolation();
@@ -228,16 +225,6 @@ public class ClientSideIteratorScanner e
   }
   
   @Override
-  public void setBatchSize(int size) {
-    this.size = size;
-  }
-  
-  @Override
-  public int getBatchSize() {
-    return size;
-  }
-  
-  @Override
   public void enableIsolation() {
     this.isolated = true;
   }

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/IsolatedScanner.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/IsolatedScanner.java?rev=1341120&r1=1341119&r2=1341120&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/IsolatedScanner.java
(original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/IsolatedScanner.java
Mon May 21 17:35:40 2012
@@ -20,7 +20,6 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.Map.Entry;
 
-import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.impl.IsolationException;
 import org.apache.accumulo.core.client.impl.ScannerOptions;
 import org.apache.accumulo.core.data.ByteSequence;
@@ -210,7 +209,6 @@ public class IsolatedScanner extends Sca
   private Scanner scanner;
   private Range range;
   private int timeOut;
-  private int batchSize;
   private RowBufferFactory bufferFactory;
   
   public IsolatedScanner(Scanner scanner) {
@@ -219,15 +217,15 @@ public class IsolatedScanner extends Sca
   
   public IsolatedScanner(Scanner scanner, RowBufferFactory bufferFactory) {
     this.scanner = scanner;
-    this.range = new Range();
-    this.timeOut = Integer.MAX_VALUE;
-    this.batchSize = Constants.SCAN_BATCH_SIZE;
+    setRange(scanner.getRange());
+    setBatchSize(scanner.getBatchSize());
+    setTimeOut(scanner.getTimeOut());
     this.bufferFactory = bufferFactory;
   }
   
   @Override
   public Iterator<Entry<Key,Value>> iterator() {
-    return new RowBufferingIterator(scanner, this, range, timeOut, batchSize, bufferFactory);
+    return new RowBufferingIterator(scanner, this, range, timeOut, getBatchSize(), bufferFactory);
   }
   
   @Override
@@ -252,16 +250,6 @@ public class IsolatedScanner extends Sca
   }
   
   @Override
-  public void setBatchSize(int size) {
-    this.batchSize = size;
-  }
-  
-  @Override
-  public int getBatchSize() {
-    return batchSize;
-  }
-  
-  @Override
   public void enableIsolation() {
     // aye aye captain, already done sir
   }

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/Scanner.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/Scanner.java?rev=1341120&r1=1341119&r2=1341120&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/Scanner.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/Scanner.java Mon May
21 17:35:40 2012
@@ -58,21 +58,6 @@ public interface Scanner extends Scanner
   public Range getRange();
   
   /**
-   * Sets the number of Key/Value pairs that will be fetched at a time from a tablet server.
-   * 
-   * @param size
-   *          the number of Key/Value pairs to fetch per call to Accumulo
-   */
-  public void setBatchSize(int size);
-  
-  /**
-   * Returns the batch size (number of Key/Value pairs) that will be fetched at a time from
a tablet server.
-   * 
-   * @return the batch size configured for this scanner
-   */
-  public int getBatchSize();
-  
-  /**
    * Enables row isolation. Writes that occur to a row after a scan of that row has begun
will not be seen if this option is enabled.
    */
   public void enableIsolation();

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java?rev=1341120&r1=1341119&r2=1341120&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java Mon
May 21 17:35:40 2012
@@ -92,6 +92,21 @@ public interface ScannerBase extends Ite
   public void clearScanIterators();
   
   /**
+   * Sets the number of Key/Value pairs that will be fetched at a time from a tablet server.
+   * 
+   * @param size
+   *          the number of Key/Value pairs to fetch per call to Accumulo
+   */
+  public void setBatchSize(int size);
+  
+  /**
+   * Returns the batch size (number of Key/Value pairs) that will be fetched at a time from
a tablet server.
+   * 
+   * @return the batch size configured for this scanner
+   */
+  public int getBatchSize();
+  
+  /**
    * Returns an iterator over an accumulo table. This iterator uses the options that are
currently set for its lifetime, so setting options will have no effect
    * on existing iterators.
    * 

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java?rev=1341120&r1=1341119&r2=1341120&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java
(original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java
Mon May 21 17:35:40 2012
@@ -104,7 +104,7 @@ class OfflineIterator implements Iterato
       return new MultiIterator(allIters, false);
     }
   }
-
+  
   private SortedKeyValueIterator<Key,Value> iter;
   private Range range;
   private KeyExtent currentExtent;
@@ -114,7 +114,7 @@ class OfflineIterator implements Iterato
   private Instance instance;
   private ScannerOptions options;
   private ArrayList<SortedKeyValueIterator<Key,Value>> readers;
-
+  
   /**
    * @param offlineScanner
    * @param instance
@@ -130,7 +130,7 @@ class OfflineIterator implements Iterato
     if (this.options.fetchedColumns.size() > 0) {
       range = range.bound(this.options.fetchedColumns.first(), this.options.fetchedColumns.last());
     }
-
+    
     this.tableId = table.toString();
     this.authorizations = authorizations;
     this.readers = new ArrayList<SortedKeyValueIterator<Key,Value>>();
@@ -141,12 +141,12 @@ class OfflineIterator implements Iterato
       
       while (iter != null && !iter.hasTop())
         nextTablet();
-
+      
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
   }
-
+  
   @Override
   public boolean hasNext() {
     return iter != null && iter.hasTop();
@@ -158,7 +158,7 @@ class OfflineIterator implements Iterato
       byte[] v = iter.getTopValue().get();
       // copy just like tablet server does, do this before calling next
       KeyValue ret = new KeyValue(new Key(iter.getTopKey()), Arrays.copyOf(v, v.length));
-
+      
       iter.next();
       
       while (iter != null && !iter.hasTop())
@@ -195,19 +195,19 @@ class OfflineIterator implements Iterato
         iter = null;
         return;
       }
-
+      
       if (range.afterEndKey(new Key(currentExtent.getEndRow()).followingKey(PartialKey.ROW)))
{
         iter = null;
         return;
       }
-
+      
       nextRange = new Range(currentExtent.getMetadataEntry(), false, null, false);
     }
-
+    
     List<String> relFiles = new ArrayList<String>();
     
     Pair<KeyExtent,String> eloc = getTabletFiles(nextRange, relFiles);
-
+    
     while (eloc.getSecond() != null) {
       if (Tables.getTableState(instance, tableId) != TableState.OFFLINE) {
         Tables.clearCache(instance);
@@ -226,7 +226,7 @@ class OfflineIterator implements Iterato
     if (!extent.getTableId().toString().equals(tableId)) {
       throw new AccumuloException(" did not find tablets for table " + tableId + " " + extent);
     }
-
+    
     if (currentExtent != null && !extent.isPreviousExtent(currentExtent))
       throw new AccumuloException(" " + currentExtent + " is not previous extent " + extent);
     
@@ -259,7 +259,7 @@ class OfflineIterator implements Iterato
     while (row.hasNext()) {
       Entry<Key,Value> entry = row.next();
       Key key = entry.getKey();
-
+      
       if (key.getColumnFamily().equals(Constants.METADATA_DATAFILE_COLUMN_FAMILY)) {
         relFiles.add(key.getColumnQualifier().toString());
       }
@@ -272,11 +272,11 @@ class OfflineIterator implements Iterato
       if (Constants.METADATA_PREV_ROW_COLUMN.hasColumns(key)) {
         extent = new KeyExtent(key.getRow(), entry.getValue());
       }
-
+      
     }
     return new Pair<KeyExtent,String>(extent, location);
   }
-
+  
   /**
    * @param absFiles
    * @return
@@ -299,7 +299,7 @@ class OfflineIterator implements Iterato
     }
     
     readers.clear();
-
+    
     // TODO need to close files
     for (String file : absFiles) {
       FileSKVIterator reader = FileOperations.getInstance().openReader(file, false, fs, conf,
acuTableConf, null, null);
@@ -326,7 +326,7 @@ class OfflineIterator implements Iterato
     return iterEnv.getTopLevelIterator(IteratorUtil.loadIterators(IteratorScope.scan, visFilter,
extent, acuTableConf, options.serverSideIteratorList,
         options.serverSideIteratorOptions, iterEnv, false));
   }
-
+  
   @Override
   public void remove() {
     throw new UnsupportedOperationException();
@@ -339,7 +339,6 @@ class OfflineIterator implements Iterato
  */
 public class OfflineScanner extends ScannerOptions implements Scanner {
   
-  private int batchSize;
   private int timeOut;
   private Range range;
   
@@ -354,13 +353,12 @@ public class OfflineScanner extends Scan
     this.credentials = credentials;
     this.tableId = new Text(tableId);
     this.range = new Range((Key) null, (Key) null);
-
+    
     this.authorizations = authorizations;
     
-    this.batchSize = Constants.SCAN_BATCH_SIZE;
     this.timeOut = Integer.MAX_VALUE;
   }
-
+  
   @Override
   public void setTimeOut(int timeOut) {
     this.timeOut = timeOut;
@@ -382,16 +380,6 @@ public class OfflineScanner extends Scan
   }
   
   @Override
-  public void setBatchSize(int size) {
-    this.batchSize = size;
-  }
-  
-  @Override
-  public int getBatchSize() {
-    return batchSize;
-  }
-  
-  @Override
   public void enableIsolation() {
     
   }
@@ -405,5 +393,5 @@ public class OfflineScanner extends Scan
   public Iterator<Entry<Key,Value>> iterator() {
     return new OfflineIterator(this, instance, credentials, authorizations, tableId, range);
   }
-
+  
 }

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerImpl.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerImpl.java?rev=1341120&r1=1341119&r2=1341120&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerImpl.java
(original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerImpl.java
Mon May 21 17:35:40 2012
@@ -30,7 +30,6 @@ package org.apache.accumulo.core.client.
 import java.util.Iterator;
 import java.util.Map.Entry;
 
-import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.data.Key;
@@ -53,7 +52,6 @@ public class ScannerImpl extends Scanner
   private Authorizations authorizations;
   private Text table;
   
-  private int size;
   private int timeOut;
   
   private Range range;
@@ -67,7 +65,6 @@ public class ScannerImpl extends Scanner
     this.range = new Range((Key) null, (Key) null);
     this.authorizations = authorizations;
     
-    this.size = Constants.SCAN_BATCH_SIZE;
     this.timeOut = Integer.MAX_VALUE;
   }
   
@@ -98,25 +95,12 @@ public class ScannerImpl extends Scanner
     return range;
   }
   
-  @Override
-  public synchronized void setBatchSize(int size) {
-    if (size > 0)
-      this.size = size;
-    else
-      throw new IllegalArgumentException("size must be greater than zero");
-  }
-  
-  @Override
-  public synchronized int getBatchSize() {
-    return size;
-  }
-  
   /**
    * Returns an iterator over an accumulo table. This iterator uses the options that are
currently set on the scanner for its lifetime. So setting options on a
    * Scanner object will have no effect on existing iterators.
    */
   public synchronized Iterator<Entry<Key,Value>> iterator() {
-    return new ScannerIterator(instance, credentials, table, authorizations, range, size,
timeOut, this, isolated);
+    return new ScannerIterator(instance, credentials, table, authorizations, range, getBatchSize(),
timeOut, this, isolated);
   }
   
   @Override

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java?rev=1341120&r1=1341119&r2=1341120&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java
(original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java
Mon May 21 17:35:40 2012
@@ -27,6 +27,7 @@ import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
 
+import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.ScannerBase;
 import org.apache.accumulo.core.data.Column;
@@ -46,6 +47,8 @@ public class ScannerOptions implements S
   
   private String regexIterName = null;
   
+  private int size = Constants.SCAN_BATCH_SIZE;
+  
   protected ScannerOptions() {}
   
   public ScannerOptions(ScannerOptions so) {
@@ -99,7 +102,7 @@ public class ScannerOptions implements S
     
     serverSideIteratorOptions.remove(iteratorName);
   }
-    
+  
   /**
    * Override any existing options on the given named iterator
    */
@@ -178,6 +181,19 @@ public class ScannerOptions implements S
   }
   
   @Override
+  public synchronized void setBatchSize(int size) {
+    if (size > 0)
+      this.size = size;
+    else
+      throw new IllegalArgumentException("size must be greater than zero");
+  }
+  
+  @Override
+  public synchronized int getBatchSize() {
+    return size;
+  }
+  
+  @Override
   public Iterator<Entry<Key,Value>> iterator() {
     throw new UnsupportedOperationException();
   }

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java?rev=1341120&r1=1341119&r2=1341120&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
(original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
Mon May 21 17:35:40 2012
@@ -69,7 +69,6 @@ import org.apache.thrift.TException;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
 
-
 public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value>>
{
   
   private static final Logger log = Logger.getLogger(TabletServerBatchReaderIterator.class);
@@ -82,7 +81,7 @@ public class TabletServerBatchReaderIter
   private final ExecutorService queryThreadPool;
   private final ScannerOptions options;
   
-  private ArrayBlockingQueue<Entry<Key,Value>> resultsQueue = new ArrayBlockingQueue<Entry<Key,Value>>(1000);
+  private ArrayBlockingQueue<Entry<Key,Value>> resultsQueue;
   private Entry<Key,Value> nextEntry = null;
   private Object nextLock = new Object();
   
@@ -131,6 +130,7 @@ public class TabletServerBatchReaderIter
     this.numThreads = numThreads;
     this.queryThreadPool = queryThreadPool;
     this.options = new ScannerOptions(scannerOptions);
+    this.resultsQueue = new ArrayBlockingQueue<Entry<Key,Value>>(this.options.getBatchSize());
     
     if (options.fetchedColumns.size() > 0) {
       ArrayList<Range> ranges2 = new ArrayList<Range>(ranges.size());

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mock/MockScanner.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mock/MockScanner.java?rev=1341120&r1=1341119&r2=1341120&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mock/MockScanner.java
(original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mock/MockScanner.java
Mon May 21 17:35:40 2012
@@ -32,7 +32,6 @@ import org.apache.accumulo.core.security
 public class MockScanner extends MockScannerBase implements Scanner {
   
   int timeOut = 0;
-  int batchSize = 0;
   Range range = new Range();
   
   MockScanner(MockTable table, Authorizations auths) {
@@ -60,16 +59,6 @@ public class MockScanner extends MockSca
   }
   
   @Override
-  public void setBatchSize(int size) {
-    this.batchSize = size;
-  }
-  
-  @Override
-  public int getBatchSize() {
-    return this.batchSize;
-  }
-  
-  @Override
   public void enableIsolation() {}
   
   @Override



Mime
View raw message