hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From apurt...@apache.org
Subject svn commit: r826464 - in /hadoop/hbase/trunk: ./ src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/ src/java/org/apache/hadoop/hbase/client/ src/java/org/apache/hadoop/hbase/regionserver/ src/test/org/apache/hadoop/h...
Date Sun, 18 Oct 2009 15:32:45 GMT
Author: apurtell
Date: Sun Oct 18 15:32:45 2009
New Revision: 826464

URL: http://svn.apache.org/viewvc?rev=826464&view=rev
Log:
HBASE-1537 Intra-row scanning

Added:
    hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestWideScanner.java
Modified:
    hadoop/hbase/trunk/CHANGES.txt
    hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/Scan.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/MinorCompactingStoreScanner.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java

Modified: hadoop/hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=826464&r1=826463&r2=826464&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Sun Oct 18 15:32:45 2009
@@ -121,8 +121,9 @@
                (Kevin Patterson via Stack)
    HBASE-1903  Enable DEBUG by default
    HBASE-1907  Version all client writables
-   HBASE-1914  hlog should be able to set replication level for the log indendently
-               from any other files
+   HBASE-1914  hlog should be able to set replication level for the log
+               indendently from any other files
+   HBASE-1537  Intra-row scanning
 
   OPTIMIZATIONS
    HBASE-410   [testing] Speed up the test suite

Modified: hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java?rev=826464&r1=826463&r2=826464&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java
(original)
+++ hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java
Sun Oct 18 15:32:45 2009
@@ -436,13 +436,16 @@
       return next;
     }
 
-    public boolean next(List<KeyValue> results) throws IOException {
+    public boolean next(List<KeyValue> results, int limit) throws IOException {
         KeyValue peek = this.peek();
         if (peek == null) {
           return false;
         }
         byte [] row = peek.getRow();
         results.add(peek);
+        if (limit > 0 && (results.size() == limit)) {
+          return true;
+        }
         while (true){
           if (this.peek() == null) {
             break;
@@ -451,10 +454,16 @@
             break;
           }
           results.add(this.next());
+          if (limit > 0 && (results.size() == limit)) {
+            break;
+          }
         }
-        return true;
-        
+        return true;        
     }
-    
+
+    public boolean next(List<KeyValue> results) throws IOException {
+      return next(results, -1);
+    }
+
    }
 }

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/Scan.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/Scan.java?rev=826464&r1=826463&r2=826464&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/Scan.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/Scan.java Sun Oct 18 15:32:45
2009
@@ -66,6 +66,9 @@
  * To limit the number of versions of each column to be returned, execute
  * {@link #setMaxVersions(int) setMaxVersions}.
  * <p>
+ * To limit the maximum number of values returned for each call to next(),
+ * execute {@link #setBatch(int) setBatch}.
+ * <p>
  * To add a filter, execute {@link #setFilter(org.apache.hadoop.hbase.filter.Filter) setFilter}.
  * <p>
  * Expert: To explicitly disable server-side block caching for this scan, 
@@ -77,6 +80,7 @@
   private byte [] startRow = HConstants.EMPTY_START_ROW;
   private byte [] stopRow  = HConstants.EMPTY_END_ROW;
   private int maxVersions = 1;
+  private int batch = -1;
   private int caching = -1;
   private boolean cacheBlocks = true;
   private Filter filter = null;
@@ -125,6 +129,7 @@
     startRow = scan.getStartRow();
     stopRow  = scan.getStopRow();
     maxVersions = scan.getMaxVersions();
+    batch = scan.getBatch();
     caching = scan.getCaching();
     cacheBlocks = scan.getCacheBlocks();
     filter = scan.getFilter(); // clone?
@@ -235,6 +240,14 @@
   }
 
   /**
+   * Set the maximum number of values to return for each call to next()
+   * @param batch the maximum number of values
+   */
+  public void setBatch(int batch) {
+    this.batch = batch;
+  }
+
+  /**
    * Set the number of rows for caching that will be passed to scanners.
    * If not set, the default setting from {@link HTable#getScannerCaching()} will apply.
    * Higher caching values will enable faster scanners but will use more memory.
@@ -319,6 +332,13 @@
   } 
 
   /**
+   * @return maximum number of values to return for a single call to next()
+   */
+  public int getBatch() {
+    return this.batch;
+  }
+
+  /**
    * @return caching the number of rows fetched when calling next on a scanner
    */
   public int getCaching() {
@@ -381,6 +401,8 @@
     sb.append(Bytes.toString(this.stopRow));
     sb.append(", maxVersions=");
     sb.append("" + this.maxVersions);
+    sb.append(", batch=");
+    sb.append("" + this.batch);
     sb.append(", caching=");
     sb.append("" + this.caching);
     sb.append(", cacheBlocks=");
@@ -444,6 +466,7 @@
     this.startRow = Bytes.readByteArray(in);
     this.stopRow = Bytes.readByteArray(in);
     this.maxVersions = in.readInt();
+    this.batch = in.readInt();
     this.caching = in.readInt();
     this.cacheBlocks = in.readBoolean();
     if(in.readBoolean()) {
@@ -473,6 +496,7 @@
     Bytes.writeByteArray(out, this.startRow);
     Bytes.writeByteArray(out, this.stopRow);
     out.writeInt(this.maxVersions);
+    out.writeInt(this.batch);
     out.writeInt(this.caching);
     out.writeBoolean(this.cacheBlocks);
     if(this.filter == null) {

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=826464&r1=826463&r2=826464&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java Sun Oct
18 15:32:45 2009
@@ -1679,9 +1679,12 @@
     private final byte [] stopRow;
     private Filter filter;
     private List<KeyValue> results = new ArrayList<KeyValue>();
+    private int batch;
 
     RegionScanner(Scan scan, List<KeyValueScanner> additionalScanners) {
       this.filter = scan.getFilter();
+      this.batch = scan.getBatch();
+
       if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) {
         this.stopRow = null;
       } else {
@@ -1711,14 +1714,14 @@
       }
     }
 
-    public boolean next(List<KeyValue> outResults) throws IOException {
+    public boolean next(List<KeyValue> outResults, int limit) throws IOException {
       if (closing.get() || closed.get()) {
         close();
         throw new NotServingRegionException(regionInfo.getRegionNameAsString() +
           " is closing=" + closing.get() + " or closed=" + closed.get());
       }
       results.clear();
-      boolean returnResult = nextInternal();
+      boolean returnResult = nextInternal(limit);
       if (!returnResult && filter != null && filter.filterRow()) {
         results.clear();
       }
@@ -1730,6 +1733,11 @@
       return returnResult;
     }
 
+    public boolean next(List<KeyValue> outResults) throws IOException {
+      // apply the batching limit by default
+      return next(outResults, batch);
+    }
+
     /*
      * @return True if a filter rules the scanner is over, done.
      */
@@ -1741,7 +1749,7 @@
      * @return true if there are more rows, false if scanner is done
      * @throws IOException
      */
-    private boolean nextInternal() throws IOException {
+    private boolean nextInternal(int limit) throws IOException {
       byte [] currentRow = null;
       boolean filterCurrentRow = false;
       while (true) {
@@ -1774,7 +1782,10 @@
           currentRow = row;
           continue;
         }
-        this.storeHeap.next(results);
+        this.storeHeap.next(results, limit);
+        if (limit > 0 && results.size() == limit) {
+          return true;
+        }
       }
     }
 

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java?rev=826464&r1=826463&r2=826464&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java
(original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java
Sun Oct 18 15:32:45 2009
@@ -46,12 +46,21 @@
    * @return true if more rows exist after this one, false if scanner is done
    * @throws IOException
    */
-  public boolean next(List<KeyValue> results)
-  throws IOException;
-  
+  public boolean next(List<KeyValue> results) throws IOException;
+
+  /**
+   * Grab the next row's worth of values with a limit on the number of values
+   * to return. 
+   * @param results
+   * @param limit
+   * @return true if more rows exist after this one, false if scanner is done
+   * @throws IOException
+   */
+  public boolean next(List<KeyValue> result, int limit) throws IOException;
+
   /**
    * Closes the scanner and releases any resources it has allocated
    * @throws IOException
    */
   public void close() throws IOException;
-}
\ No newline at end of file
+}

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java?rev=826464&r1=826463&r2=826464&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java Sun
Oct 18 15:32:45 2009
@@ -96,11 +96,13 @@
    * <p>
    * This can ONLY be called when you are using Scanners that implement
    * InternalScanner as well as KeyValueScanner (a {@link StoreScanner}).
+   * @param result
+   * @param limit
    * @return true if there are more keys, false if all scanners are done 
    */
-  public boolean next(List<KeyValue> result) throws IOException {
+  public boolean next(List<KeyValue> result, int limit) throws IOException {
     InternalScanner currentAsInternal = (InternalScanner)this.current;
-    currentAsInternal.next(result);
+    currentAsInternal.next(result, limit);
     KeyValue pee = this.current.peek();
     if (pee == null) {
       this.current.close();
@@ -110,7 +112,21 @@
     this.current = this.heap.poll();
     return (this.current != null);
   }
-  
+
+  /**
+   * Gets the next row of keys from the top-most scanner.
+   * <p>
+   * This method takes care of updating the heap.
+   * <p>
+   * This can ONLY be called when you are using Scanners that implement
+   * InternalScanner as well as KeyValueScanner (a {@link StoreScanner}).
+   * @param result
+   * @return true if there are more keys, false if all scanners are done 
+   */
+  public boolean next(List<KeyValue> result) throws IOException {
+    return next(result, -1);
+  }
+
   private class KVScannerComparator implements Comparator<KeyValueScanner> {
     private KVComparator kvComparator;
     /**

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/MinorCompactingStoreScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/MinorCompactingStoreScanner.java?rev=826464&r1=826463&r2=826464&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/MinorCompactingStoreScanner.java
(original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/MinorCompactingStoreScanner.java
Sun Oct 18 15:32:45 2009
@@ -170,6 +170,12 @@
     return false;
   }
 
+  @Override
+  public boolean next(List<KeyValue> results, int limit) throws IOException {
+    // should not use limits with minor compacting store scanner
+    return next(results);
+  }
+
   public void close() {
     heap.close();
   }

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=826464&r1=826463&r2=826464&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Sun
Oct 18 15:32:45 2009
@@ -146,9 +146,10 @@
   /**
    * Get the next row of values from this Store.
    * @param result
+   * @param limit
    * @return true if there are more rows, false if scanner is done
    */
-  public synchronized boolean next(List<KeyValue> outResult) throws IOException {
+  public synchronized boolean next(List<KeyValue> outResult, int limit) throws IOException
{
     KeyValue peeked = this.heap.peek();
     if (peeked == null) {
       close();
@@ -157,14 +158,17 @@
     matcher.setRow(peeked.getRow());
     KeyValue kv;
     List<KeyValue> results = new ArrayList<KeyValue>();
-    while((kv = this.heap.peek()) != null) {
+    LOOP: while((kv = this.heap.peek()) != null) {
       QueryMatcher.MatchCode qcode = matcher.match(kv);
       switch(qcode) {
         case INCLUDE:
           KeyValue next = this.heap.next();
           results.add(next);
+          if (limit > 0 && (results.size() == limit)) {
+            break LOOP;
+          }
           continue;
-          
+
         case DONE:
           // copy jazz
           outResult.addAll(results);
@@ -209,6 +213,10 @@
     return false;
   }
 
+  public synchronized boolean next(List<KeyValue> outResult) throws IOException {
+    return next(outResult, -1);
+  }
+
   private List<KeyValueScanner> getStoreFileScanners() {
     List<HFileScanner> s =
       new ArrayList<HFileScanner>(this.store.getStorefilesCount());

Added: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestWideScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestWideScanner.java?rev=826464&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestWideScanner.java
(added)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestWideScanner.java
Sun Oct 18 15:32:45 2009
@@ -0,0 +1,126 @@
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestCase;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+
+public class TestWideScanner extends HBaseTestCase {
+  private final Log LOG = LogFactory.getLog(this.getClass());
+
+  final int BATCH = 1000;
+
+  private MiniDFSCluster cluster = null;
+  private HRegion r;
+
+  static final HTableDescriptor TESTTABLEDESC =
+    new HTableDescriptor("testwidescan");
+  static {
+    TESTTABLEDESC.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY,
+      10,  // Ten is arbitrary number.  Keep versions to help debuggging.
+      Compression.Algorithm.NONE.getName(), false, true, 8 * 1024,
+      HConstants.FOREVER, false));
+  }
+  /** HRegionInfo for root region */
+  public static final HRegionInfo REGION_INFO =
+    new HRegionInfo(TESTTABLEDESC, HConstants.EMPTY_BYTE_ARRAY,
+    HConstants.EMPTY_BYTE_ARRAY);
+
+  @Override
+  public void setUp() throws Exception {
+    cluster = new MiniDFSCluster(conf, 2, true, (String[])null);
+    // Set the hbase.rootdir to be the home directory in mini dfs.
+    this.conf.set(HConstants.HBASE_DIR,
+      this.cluster.getFileSystem().getHomeDirectory().toString());
+    super.setUp();    
+  }
+
+  private int addWideContent(HRegion region, byte[] family) 
+      throws IOException {
+    int count = 0;
+    // add a few rows of 2500 columns (we'll use batch of 1000) to make things
+    // interesting
+    for (char c = 'a'; c <= 'c'; c++) {
+      byte[] row = Bytes.toBytes("ab" + c);
+      int i;
+      for (i = 0; i < 2500; i++) {
+        byte[] b = Bytes.toBytes(String.format("%10d", i));
+        Put put = new Put(row);
+        put.add(family, b, b);
+        region.put(put);
+        count++;
+      }
+    }
+    // add one row of 100,000 columns
+    {
+      byte[] row = Bytes.toBytes("abf");
+      int i;
+      for (i = 0; i < 100000; i++) {
+        byte[] b = Bytes.toBytes(String.format("%10d", i));
+        Put put = new Put(row);
+        put.add(family, b, b);
+        region.put(put);
+        count++;
+      }
+    }
+    return count;
+  }
+
+  public void testWideScanBatching() throws IOException {
+    try {
+      this.r = createNewHRegion(REGION_INFO.getTableDesc(), null, null);
+      int inserted = addWideContent(this.r, HConstants.CATALOG_FAMILY);
+      List<KeyValue> results = new ArrayList<KeyValue>();
+      Scan scan = new Scan();
+      scan.addFamily(HConstants.CATALOG_FAMILY);
+      scan.setBatch(BATCH);
+      InternalScanner s = r.getScanner(scan);
+      int total = 0;
+      int i = 0;
+      boolean more;
+      do {
+        more = s.next(results);
+        i++;
+        LOG.info("iteration #" + i + ", results.size=" + results.size());
+
+        // assert that the result set is no larger than BATCH
+        assertTrue(results.size() <= BATCH);
+
+        total += results.size();
+
+        if (results.size() > 0) {
+          // assert that all results are from the same row
+          byte[] row = results.get(0).getRow();
+          for (KeyValue kv: results) {
+            assertTrue(Bytes.equals(row, kv.getRow()));
+          }
+        }
+
+        results.clear();
+      } while (more);
+
+      // assert that the scanner returned all values
+      LOG.info("inserted " + inserted + ", scanned " + total);
+      assertTrue(total == inserted);
+
+      s.close();
+    } finally {
+      this.r.close();
+      this.r.getLog().closeAndDelete();
+      shutdownDfs(this.cluster);
+    }
+  }
+}



Mime
View raw message