Return-Path: Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: (qmail 48559 invoked from network); 15 Jul 2010 17:16:13 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 15 Jul 2010 17:16:13 -0000 Received: (qmail 44834 invoked by uid 500); 15 Jul 2010 17:16:13 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 44793 invoked by uid 500); 15 Jul 2010 17:16:12 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 44786 invoked by uid 99); 15 Jul 2010 17:16:12 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 15 Jul 2010 17:16:12 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 15 Jul 2010 17:16:08 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 9716E2388903; Thu, 15 Jul 2010 17:15:14 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r964496 - in /hbase/trunk: ./ src/main/java/org/apache/hadoop/hbase/io/ src/main/java/org/apache/hadoop/hbase/regionserver/ src/test/java/org/apache/hadoop/hbase/client/ Date: Thu, 15 Jul 2010 17:15:14 -0000 To: commits@hbase.apache.org From: jgray@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100715171514.9716E2388903@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: jgray Date: Thu Jul 15 17:15:14 2010 New Revision: 964496 URL: http://svn.apache.org/viewvc?rev=964496&view=rev Log: HBASE-2517 During reads when passed the specified time range, seek to next column (Pranav via jgray) Modified: hbase/trunk/CHANGES.txt hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestMultipleTimestamps.java Modified: hbase/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=964496&r1=964495&r2=964496&view=diff ============================================================================== --- hbase/trunk/CHANGES.txt (original) +++ hbase/trunk/CHANGES.txt Thu Jul 15 17:15:14 2010 @@ -766,6 +766,8 @@ Release 0.21.0 - Unreleased (Pranav via Ryan) HBASE-2836 Speed mvn site building by removing generation of useless reports HBASE-2808 Document the implementation of replication + HBASE-2517 During reads when passed the specified time range, seek to + next column (Pranav via jgray) NEW FEATURES HBASE-1961 HBase EC2 scripts Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java?rev=964496&r1=964495&r2=964496&view=diff ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java (original) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java Thu Jul 15 17:15:14 2010 @@ -147,6 +147,23 @@ public class TimeRange implements Writab return (timestamp >= minStamp); } + /** + * Compare the timestamp to timerange + * @param timestamp + * @return -1 if timestamp is less than timerange, + * 0 if timestamp is within timerange, + * 1 if timestamp is greater than timerange + */ + public int compare(long timestamp) { + if (timestamp < minStamp) { + return -1; + } else if (timestamp >= maxStamp) { + return 1; + } else { + return 0; + } + } + @Override public String toString() { StringBuilder sb = new StringBuilder(); Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java?rev=964496&r1=964495&r2=964496&view=diff ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java (original) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java Thu Jul 15 17:15:14 2010 @@ -163,4 +163,39 @@ public class ExplicitColumnTracker imple col.setCount(this.maxVersions); } } + + /** + * This method is used to inform the column tracker that we are done with + * this column. We may get this information from external filters or + * timestamp range and we then need to indicate this information to + * tracker. It is required only in case of ExplicitColumnTracker. + * @param bytes + * @param offset + * @param length + */ + public void doneWithColumn(byte [] bytes, int offset, int length) { + while (this.column != null) { + int compare = Bytes.compareTo(column.getBuffer(), column.getOffset(), + column.getLength(), bytes, offset, length); + if (compare == 0) { + this.columns.remove(this.index); + if (this.columns.size() == this.index) { + // Will not hit any more columns in this storefile + this.column = null; + } else { + this.column = this.columns.get(this.index); + } + return; + } else if ( compare <= -1) { + if(++this.index != this.columns.size()) { + this.column = this.columns.get(this.index); + } else { + this.column = null; + } + } else { + return; + } + } + } + } Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java?rev=964496&r1=964495&r2=964496&view=diff ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java (original) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java Thu Jul 15 17:15:14 2010 @@ -111,9 +111,16 @@ public class KeyValueHeap implements Key return false; } InternalScanner currentAsInternal = (InternalScanner)this.current; - currentAsInternal.next(result, limit); + boolean mayContainsMoreRows = currentAsInternal.next(result, limit); KeyValue pee = this.current.peek(); - if (pee == null) { + /* + * By definition, any InternalScanner must return false only when it has no + * further rows to be fetched. So, we can close a scanner if it returns + * false. All existing implementations seem to be fine with this. It is much + * more efficient to close scanners which are not needed than keep them in + * the heap. This is also required for certain optimizations. + */ + if (pee == null || !mayContainsMoreRows) { this.current.close(); } else { this.heap.add(this.current); Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java?rev=964496&r1=964495&r2=964496&view=diff ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java (original) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java Thu Jul 15 17:15:14 2010 @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.Filter.ReturnCode; @@ -34,6 +35,7 @@ public class ScanQueryMatcher extends Qu // Optimization so we can skip lots of compares when we decide to skip // to the next row. private boolean stickyNextRow; + private byte[] stopRow; /** * Constructs a QueryMatcher for a Scan. @@ -50,6 +52,7 @@ public class ScanQueryMatcher extends Qu this.oldestStamp = System.currentTimeMillis() - ttl; this.rowComparator = rowComparator; this.deletes = new ScanDeleteTracker(); + this.stopRow = scan.getStopRow(); this.startKey = KeyValue.createFirstOnRow(scan.getStartRow()); this.filter = scan.getFilter(); @@ -140,17 +143,37 @@ public class ScanQueryMatcher extends Qu return MatchCode.SKIP; } - if (!tr.withinTimeRange(timestamp)) { + if (!this.deletes.isEmpty() && + deletes.isDeleted(bytes, offset, qualLength, timestamp)) { return MatchCode.SKIP; } - if (!this.deletes.isEmpty() && - deletes.isDeleted(bytes, offset, qualLength, timestamp)) { + int timestampComparison = tr.compare(timestamp); + if (timestampComparison >= 1) { return MatchCode.SKIP; + } else if (timestampComparison <= -1) { + return getNextRowOrNextColumn(bytes, offset, qualLength); } - MatchCode colChecker = columns.checkColumn(bytes, offset, qualLength); + /** + * Filters should be checked before checking column trackers. If we do + * otherwise, as was previously being done, ColumnTracker may increment its + * counter for even that KV which may be discarded later on by Filter. This + * would lead to incorrect results in certain cases. + */ + if (filter != null) { + ReturnCode filterResponse = filter.filterKeyValue(kv); + if (filterResponse == ReturnCode.SKIP) { + return MatchCode.SKIP; + } else if (filterResponse == ReturnCode.NEXT_COL) { + return getNextRowOrNextColumn(bytes, offset, qualLength); + } else if (filterResponse == ReturnCode.NEXT_ROW) { + stickyNextRow = true; + return MatchCode.SEEK_NEXT_ROW; + } + } + MatchCode colChecker = columns.checkColumn(bytes, offset, qualLength); // if SKIP -> SEEK_NEXT_COL // if (NEXT,DONE) -> SEEK_NEXT_ROW // if (INCLUDE) -> INCLUDE @@ -161,24 +184,35 @@ public class ScanQueryMatcher extends Qu return MatchCode.SEEK_NEXT_ROW; } - // else INCLUDE - // if (colChecker == MatchCode.INCLUDE) - // give the filter a chance to run. - if (filter == null) - return MatchCode.INCLUDE; - - ReturnCode filterResponse = filter.filterKeyValue(kv); - if (filterResponse == ReturnCode.INCLUDE) - return MatchCode.INCLUDE; + return MatchCode.INCLUDE; - if (filterResponse == ReturnCode.SKIP) - return MatchCode.SKIP; - else if (filterResponse == ReturnCode.NEXT_COL) + } + + public MatchCode getNextRowOrNextColumn(byte[] bytes, int offset, + int qualLength) { + if (columns instanceof ExplicitColumnTracker) { + //We only come here when we know that columns is an instance of + //ExplicitColumnTracker so we should never have a cast exception + ((ExplicitColumnTracker)columns).doneWithColumn(bytes, offset, + qualLength); + if (columns.getColumnHint() == null) { + return MatchCode.SEEK_NEXT_ROW; + } else { + return MatchCode.SEEK_NEXT_COL; + } + } else { return MatchCode.SEEK_NEXT_COL; - // else if (filterResponse == ReturnCode.NEXT_ROW) + } + } - stickyNextRow = true; - return MatchCode.SEEK_NEXT_ROW; + public boolean moreRowsMayExistAfter(KeyValue kv) { + if (!Bytes.equals(stopRow , HConstants.EMPTY_END_ROW) && + rowComparator.compareRows(kv.getBuffer(),kv.getRowOffset(), + kv.getRowLength(), stopRow, 0, stopRow.length) >= 0) { + return false; + } else { + return true; + } } /** Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=964496&r1=964495&r2=964496&view=diff ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Thu Jul 15 17:15:14 2010 @@ -256,6 +256,10 @@ class StoreScanner implements KeyValueSc return false; case SEEK_NEXT_ROW: + if (!matcher.moreRowsMayExistAfter(kv)) { + outResult.addAll(results); + return false; + } heap.next(); break; Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestMultipleTimestamps.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestMultipleTimestamps.java?rev=964496&r1=964495&r2=964496&view=diff ============================================================================== --- hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestMultipleTimestamps.java (original) +++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestMultipleTimestamps.java Thu Jul 15 17:15:14 2010 @@ -79,6 +79,180 @@ public class TestMultipleTimestamps { } @Test + public void testReseeksWithOneColumnMiltipleTimestamp() throws IOException { + byte [] TABLE = Bytes.toBytes("testReseeksWithOne" + + "ColumnMiltipleTimestamps"); + byte [] FAMILY = Bytes.toBytes("event_log"); + byte [][] FAMILIES = new byte[][] { FAMILY }; + + // create table; set versions to max... + HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE); + + Integer[] putRows = new Integer[] {1, 3, 5, 7}; + Integer[] putColumns = new Integer[] { 1, 3, 5}; + Long[] putTimestamps = new Long[] {1L, 2L, 3L, 4L, 5L}; + + Integer[] scanRows = new Integer[] {3, 5}; + Integer[] scanColumns = new Integer[] {3}; + Long[] scanTimestamps = new Long[] {3L, 4L}; + int scanMaxVersions = 2; + + put(ht, FAMILY, putRows, putColumns, putTimestamps); + + flush(); + + ResultScanner scanner = scan(ht, FAMILY, scanRows, scanColumns, + scanTimestamps, scanMaxVersions); + + KeyValue[] kvs; + + kvs = scanner.next().raw(); + assertEquals(2, kvs.length); + checkOneCell(kvs[0], FAMILY, 3, 3, 4); + checkOneCell(kvs[1], FAMILY, 3, 3, 3); + kvs = scanner.next().raw(); + assertEquals(2, kvs.length); + checkOneCell(kvs[0], FAMILY, 5, 3, 4); + checkOneCell(kvs[1], FAMILY, 5, 3, 3); + } + + @Test + public void testReseeksWithMultipleColumnOneTimestamp() throws IOException { + byte [] TABLE = Bytes.toBytes("testReseeksWithOne" + + "ColumnMiltipleTimestamps"); + byte [] FAMILY = Bytes.toBytes("event_log"); + byte [][] FAMILIES = new byte[][] { FAMILY }; + + // create table; set versions to max... + HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE); + + Integer[] putRows = new Integer[] {1, 3, 5, 7}; + Integer[] putColumns = new Integer[] { 1, 3, 5}; + Long[] putTimestamps = new Long[] {1L, 2L, 3L, 4L, 5L}; + + Integer[] scanRows = new Integer[] {3, 5}; + Integer[] scanColumns = new Integer[] {3,4}; + Long[] scanTimestamps = new Long[] {3L}; + int scanMaxVersions = 2; + + put(ht, FAMILY, putRows, putColumns, putTimestamps); + + flush(); + + ResultScanner scanner = scan(ht, FAMILY, scanRows, scanColumns, + scanTimestamps, scanMaxVersions); + + KeyValue[] kvs; + + kvs = scanner.next().raw(); + assertEquals(1, kvs.length); + checkOneCell(kvs[0], FAMILY, 3, 3, 3); + kvs = scanner.next().raw(); + assertEquals(1, kvs.length); + checkOneCell(kvs[0], FAMILY, 5, 3, 3); + } + + @Test + public void testReseeksWithMultipleColumnMultipleTimestamp() throws + IOException { + byte [] TABLE = Bytes.toBytes("testReseeksWithOne" + + "ColumnMiltipleTimestamps"); + byte [] FAMILY = Bytes.toBytes("event_log"); + byte [][] FAMILIES = new byte[][] { FAMILY }; + + // create table; set versions to max... + HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE); + + Integer[] putRows = new Integer[] {1, 3, 5, 7}; + Integer[] putColumns = new Integer[] { 1, 3, 5}; + Long[] putTimestamps = new Long[] {1L, 2L, 3L, 4L, 5L}; + + Integer[] scanRows = new Integer[] {5, 7}; + Integer[] scanColumns = new Integer[] {3, 4, 5}; + Long[] scanTimestamps = new Long[] {2l, 3L}; + int scanMaxVersions = 2; + + put(ht, FAMILY, putRows, putColumns, putTimestamps); + + flush(); + + ResultScanner scanner = scan(ht, FAMILY, scanRows, scanColumns, + scanTimestamps, scanMaxVersions); + + KeyValue[] kvs; + + kvs = scanner.next().raw(); + assertEquals(4, kvs.length); + checkOneCell(kvs[0], FAMILY, 5, 3, 3); + checkOneCell(kvs[1], FAMILY, 5, 3, 2); + checkOneCell(kvs[2], FAMILY, 5, 5, 3); + checkOneCell(kvs[3], FAMILY, 5, 5, 2); + kvs = scanner.next().raw(); + assertEquals(4, kvs.length); + checkOneCell(kvs[0], FAMILY, 7, 3, 3); + checkOneCell(kvs[1], FAMILY, 7, 3, 2); + checkOneCell(kvs[2], FAMILY, 7, 5, 3); + checkOneCell(kvs[3], FAMILY, 7, 5, 2); + } + + @Test + public void testReseeksWithMultipleFiles() throws IOException { + byte [] TABLE = Bytes.toBytes("testReseeksWithOne" + + "ColumnMiltipleTimestamps"); + byte [] FAMILY = Bytes.toBytes("event_log"); + byte [][] FAMILIES = new byte[][] { FAMILY }; + + // create table; set versions to max... + HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE); + + Integer[] putRows1 = new Integer[] {1, 2, 3}; + Integer[] putColumns1 = new Integer[] { 2, 5, 6}; + Long[] putTimestamps1 = new Long[] {1L, 2L, 5L}; + + Integer[] putRows2 = new Integer[] {6, 7}; + Integer[] putColumns2 = new Integer[] {3, 6}; + Long[] putTimestamps2 = new Long[] {4L, 5L}; + + Integer[] putRows3 = new Integer[] {2, 3, 5}; + Integer[] putColumns3 = new Integer[] {1, 2, 3}; + Long[] putTimestamps3 = new Long[] {4L,8L}; + + + Integer[] scanRows = new Integer[] {3, 5, 7}; + Integer[] scanColumns = new Integer[] {3, 4, 5}; + Long[] scanTimestamps = new Long[] {2l, 4L}; + int scanMaxVersions = 5; + + put(ht, FAMILY, putRows1, putColumns1, putTimestamps1); + flush(); + put(ht, FAMILY, putRows2, putColumns2, putTimestamps2); + flush(); + put(ht, FAMILY, putRows3, putColumns3, putTimestamps3); + + ResultScanner scanner = scan(ht, FAMILY, scanRows, scanColumns, + scanTimestamps, scanMaxVersions); + + KeyValue[] kvs; + + kvs = scanner.next().raw(); + assertEquals(2, kvs.length); + checkOneCell(kvs[0], FAMILY, 3, 3, 4); + checkOneCell(kvs[1], FAMILY, 3, 5, 2); + + kvs = scanner.next().raw(); + assertEquals(1, kvs.length); + checkOneCell(kvs[0], FAMILY, 5, 3, 4); + + kvs = scanner.next().raw(); + assertEquals(1, kvs.length); + checkOneCell(kvs[0], FAMILY, 6, 3, 4); + + kvs = scanner.next().raw(); + assertEquals(1, kvs.length); + checkOneCell(kvs[0], FAMILY, 7, 3, 4); + } + + @Test public void testWithVersionDeletes() throws Exception { // first test from memstore (without flushing). @@ -109,7 +283,8 @@ public class TestMultipleTimestamps { // request a bunch of versions including the deleted version. We should // only get back entries for the versions that exist. - KeyValue kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L, 4L, 5L)); + KeyValue kvs[] = getNVersions(ht, FAMILY, 0, 0, + Arrays.asList(2L, 3L, 4L, 5L)); assertEquals(3, kvs.length); checkOneCell(kvs[0], FAMILY, 0, 0, 5); checkOneCell(kvs[1], FAMILY, 0, 0, 3); @@ -240,6 +415,44 @@ public class TestMultipleTimestamps { return result.raw(); } + private ResultScanner scan(HTable ht, byte[] cf, + Integer[] rowIndexes, Integer[] columnIndexes, + Long[] versions, int maxVersions) + throws IOException { + Arrays.asList(rowIndexes); + byte startRow[] = Bytes.toBytes("row:" + + Collections.min( Arrays.asList(rowIndexes))); + byte endRow[] = Bytes.toBytes("row:" + + Collections.max( Arrays.asList(rowIndexes))+1); + Scan scan = new Scan(startRow, endRow); + for (Integer colIdx: columnIndexes) { + byte column[] = Bytes.toBytes("column:" + colIdx); + scan.addColumn(cf, column); + } + scan.setMaxVersions(maxVersions); + scan.setTimeRange(Collections.min(Arrays.asList(versions)), + Collections.max(Arrays.asList(versions))+1); + ResultScanner scanner = ht.getScanner(scan); + return scanner; + } + + private void put(HTable ht, byte[] cf, Integer[] rowIndexes, + Integer[] columnIndexes, Long[] versions) + throws IOException { + for (int rowIdx: rowIndexes) { + byte row[] = Bytes.toBytes("row:" + rowIdx); + Put put = new Put(row); + for(int colIdx: columnIndexes) { + byte column[] = Bytes.toBytes("column:" + colIdx); + for (long version: versions) { + put.add(cf, column, version, Bytes.toBytes("value-version-" + + version)); + } + } + ht.put(put); + } + } + /** * Insert in specific row/column versions with timestamps * versionStart..versionEnd.