Return-Path: Delivered-To: apmail-hadoop-hbase-commits-archive@minotaur.apache.org Received: (qmail 33480 invoked from network); 17 Aug 2009 22:21:11 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 17 Aug 2009 22:21:11 -0000 Received: (qmail 26851 invoked by uid 500); 17 Aug 2009 22:21:30 -0000 Delivered-To: apmail-hadoop-hbase-commits-archive@hadoop.apache.org Received: (qmail 26821 invoked by uid 500); 17 Aug 2009 22:21:30 -0000 Mailing-List: contact hbase-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hbase-dev@hadoop.apache.org Delivered-To: mailing list hbase-commits@hadoop.apache.org Received: (qmail 26806 invoked by uid 99); 17 Aug 2009 22:21:30 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 17 Aug 2009 22:21:30 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 17 Aug 2009 22:21:21 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 2B1F423888DD; Mon, 17 Aug 2009 22:21:01 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r805183 - in /hadoop/hbase/trunk: ./ src/java/org/apache/hadoop/hbase/regionserver/ src/java/org/apache/hadoop/hbase/util/ src/test/org/apache/hadoop/hbase/io/ src/test/org/apache/hadoop/hbase/regionserver/ Date: Mon, 17 Aug 2009 22:21:00 -0000 To: hbase-commits@hadoop.apache.org From: stack@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090817222101.2B1F423888DD@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: stack Date: Mon Aug 17 22:21:00 2009 New Revision: 805183 URL: http://svn.apache.org/viewvc?rev=805183&view=rev Log: HBASE-1738 Scanner doesnt reset when a snapshot is created, could miss new updates into the 'kvset' (active part) Modified: hadoop/hbase/trunk/CHANGES.txt hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/ClassSize.java hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/io/TestHeapSize.java hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestMemStore.java Modified: hadoop/hbase/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=805183&r1=805182&r2=805183&view=diff ============================================================================== --- hadoop/hbase/trunk/CHANGES.txt (original) +++ hadoop/hbase/trunk/CHANGES.txt Mon Aug 17 22:21:00 2009 @@ -337,6 +337,8 @@ storefile problems HBASE-1761 getclosest doesn't understand delete family; manifests as "HRegionInfo was null or empty in .META" A.K.A the BS problem + HBASE-1738 Scanner doesnt reset when a snapshot is created, could miss + new updates into the 'kvset' (active part) IMPROVEMENTS HBASE-1089 Add count of regions on filesystem to master UI; add percentage Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java?rev=805183&r1=805182&r2=805183&view=diff ============================================================================== --- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java (original) +++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java Mon Aug 17 22:21:00 2009 @@ -28,7 +28,9 @@ import java.util.Iterator; import java.util.List; import java.util.NavigableSet; +import java.util.Set; import java.util.SortedSet; +import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -78,6 +80,10 @@ // Used to track own heapSize final AtomicLong size; + // All access must be synchronized. + final CopyOnWriteArraySet changedMemStoreObservers = + new CopyOnWriteArraySet(); + /** * Default constructor. Used for tests. */ @@ -123,12 +129,10 @@ LOG.warn("Snapshot called again without clearing previous. " + "Doing nothing. Another ongoing flush or did we fail last attempt?"); } else { - // We used to synchronize on the memstore here but we're inside a - // write lock so removed it. Comment is left in case removal was a - // mistake. St.Ack if (!this.kvset.isEmpty()) { this.snapshot = this.kvset; this.kvset = new KeyValueSkipListSet(this.comparator); + tellChangedMemStoreObservers(); // Reset heap to not include any keys this.size.set(DEEP_OVERHEAD); } @@ -138,6 +142,15 @@ } } + /* + * Tell outstanding scanners that memstore has changed. + */ + private void tellChangedMemStoreObservers() { + for (ChangedMemStoreObserver o: this.changedMemStoreObservers) { + o.changedMemStore(); + } + } + /** * Return the current snapshot. * Called by flusher to get current snapshot made by a previous @@ -168,6 +181,7 @@ // create a new snapshot and let the old one go. if (!ss.isEmpty()) { this.snapshot = new KeyValueSkipListSet(this.comparator); + tellChangedMemStoreObservers(); } } finally { this.lock.writeLock().unlock(); @@ -445,9 +459,8 @@ KeyValueScanner [] getScanners() { this.lock.readLock().lock(); try { - KeyValueScanner [] scanners = new KeyValueScanner[2]; - scanners[0] = new MemStoreScanner(this.kvset); - scanners[1] = new MemStoreScanner(this.snapshot); + KeyValueScanner [] scanners = new KeyValueScanner[1]; + scanners[0] = new MemStoreScanner(this.changedMemStoreObservers); return scanners; } finally { this.lock.readLock().unlock(); @@ -521,18 +534,22 @@ /* * MemStoreScanner implements the KeyValueScanner. - * It lets the caller scan the contents of a memstore. - * This behaves as if it were a real scanner but does not maintain position - * in the passed memstore tree. - */ - protected class MemStoreScanner implements KeyValueScanner { - private final NavigableSet kvs; - private KeyValue current = null; + * It lets the caller scan the contents of a memstore -- both current + * map and snapshot. + * This behaves as if it were a real scanner but does not maintain position. + */ + protected class MemStoreScanner implements KeyValueScanner, ChangedMemStoreObserver { private List result = new ArrayList(); private int idx = 0; - - MemStoreScanner(final NavigableSet s) { - this.kvs = s; + // Make access atomic. + private FirstOnRow firstOnNextRow = new FirstOnRow(); + // Keep reference to Set so can remove myself when closed. + private final Set observers; + + MemStoreScanner(final Set observers) { + super(); + this.observers = observers; + this.observers.add(this); } public boolean seek(KeyValue key) { @@ -541,7 +558,7 @@ close(); return false; } - this.current = key; + this.firstOnNextRow.set(key); return cacheNextRow(); } catch(Exception e) { close(); @@ -570,47 +587,117 @@ } /** - * @return True if we successfully cached a NavigableSet aligned on - * next row. + * @return True if successfully cached a next row. */ boolean cacheNextRow() { - SortedSet keys; + // Prevent snapshot being cleared while caching a row. + lock.readLock().lock(); + this.result.clear(); + this.idx = 0; try { - keys = this.kvs.tailSet(this.current); - } catch (Exception e) { - close(); - return false; - } - if (keys == null || keys.isEmpty()) { - close(); - return false; + // Look at each set, kvset and snapshot. + // Both look for matching entries for this.current row returning what + // they + // have as next row after this.current (or null if nothing in set or if + // nothing follows. + KeyValue kvsetNextRow = cacheNextRow(kvset); + KeyValue snapshotNextRow = cacheNextRow(snapshot); + if (kvsetNextRow == null && snapshotNextRow == null) { + // Nothing more in memstore but we might have gotten current row + // results + // Indicate at end of store by setting next row to null. + this.firstOnNextRow.set(null); + return !this.result.isEmpty(); + } else if (kvsetNextRow != null && snapshotNextRow != null) { + // Set current at the lowest of the two values. + int compare = comparator.compare(kvsetNextRow, snapshotNextRow); + this.firstOnNextRow.set(compare <= 0? kvsetNextRow: snapshotNextRow); + } else { + this.firstOnNextRow.set(kvsetNextRow != null? kvsetNextRow: snapshotNextRow); + } + return true; + } finally { + lock.readLock().unlock(); } - this.current = null; - byte [] row = keys.first().getRow(); - for (KeyValue kv: keys) { - if (comparator.compareRows(kv, row) != 0) { - this.current = kv; + } + + /* + * See if set has entries for the this.current row. If so, + * add them to this.result. + * @param set Set to examine + * @return Next row in passed set or null if nothing in this + * passed set + */ + private KeyValue cacheNextRow(final NavigableSet set) { + if (this.firstOnNextRow.get() == null || set.isEmpty()) return null; + SortedSet tail = set.tailSet(this.firstOnNextRow.get()); + if (tail == null || tail.isEmpty()) return null; + KeyValue first = tail.first(); + KeyValue nextRow = null; + for (KeyValue kv: tail) { + if (comparator.compareRows(first, kv) != 0) { + nextRow = kv; break; } - result.add(kv); + this.result.add(kv); } - return true; + return nextRow; } public void close() { - current = null; + this.firstOnNextRow.set(null); idx = 0; if (!result.isEmpty()) { result.clear(); } + this.observers.remove(this); + } + + public void changedMemStore() { + this.firstOnNextRow.reset(); } } - + + /* + * Private class that holds firstOnRow and utility. + * Usually firstOnRow is the first KeyValue we find on next row rather than + * the absolute minimal first key (empty column, Type.Maximum, maximum ts). + * Usually its ok being sloppy with firstOnRow letting it be the first thing + * found on next row -- this works -- but if the memstore changes on us, reset + * firstOnRow to be the ultimate firstOnRow. We play sloppy with firstOnRow + * usually so we don't have to allocate a new KeyValue each time firstOnRow + * is updated. + */ + private static class FirstOnRow { + private KeyValue firstOnRow = null; + + FirstOnRow() { + super(); + } + + synchronized void set(final KeyValue kv) { + this.firstOnRow = kv; + } + + /* Reset firstOnRow to a 'clean', absolute firstOnRow. + */ + synchronized void reset() { + if (this.firstOnRow == null) return; + this.firstOnRow = + new KeyValue(this.firstOnRow.getRow(), HConstants.LATEST_TIMESTAMP); + } + + synchronized KeyValue get() { + return this.firstOnRow; + } + } + public final static long FIXED_OVERHEAD = ClassSize.align( - ClassSize.OBJECT + (7 * ClassSize.REFERENCE)); + ClassSize.OBJECT + (8 * ClassSize.REFERENCE)); public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD + ClassSize.REENTRANT_LOCK + ClassSize.ATOMIC_LONG + + ClassSize.COPYONWRITE_ARRAYSET + ClassSize.COPYONWRITE_ARRAYLIST + (2 * ClassSize.CONCURRENT_SKIPLISTMAP)); /* @@ -682,4 +769,16 @@ } LOG.info("Exiting."); } -} + + /** + * Observers want to know about MemStore changes. + * Called when snapshot is cleared and when we make one. + */ + interface ChangedMemStoreObserver { + /** + * Notify observers. + * @throws IOException + */ + void changedMemStore(); + } +} \ No newline at end of file Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/ClassSize.java URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/ClassSize.java?rev=805183&r1=805182&r2=805183&view=diff ============================================================================== --- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/ClassSize.java (original) +++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/ClassSize.java Mon Aug 17 22:21:00 2009 @@ -92,6 +92,12 @@ /** Overhead for AtomicBoolean */ public static int ATOMIC_BOOLEAN = 0; + /** Overhead for CopyOnWriteArraySet */ + public static int COPYONWRITE_ARRAYSET = 0; + + /** Overhead for CopyOnWriteArrayList */ + public static int COPYONWRITE_ARRAYLIST = 0; + private static final String THIRTY_TWO = "32"; /** @@ -151,6 +157,9 @@ ATOMIC_BOOLEAN = align(OBJECT + Bytes.SIZEOF_BOOLEAN); + COPYONWRITE_ARRAYSET = align(OBJECT + REFERENCE); + + COPYONWRITE_ARRAYLIST = align(OBJECT + (2 * REFERENCE) + ARRAY); } /** Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/io/TestHeapSize.java URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/io/TestHeapSize.java?rev=805183&r1=805182&r2=805183&view=diff ============================================================================== --- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/io/TestHeapSize.java (original) +++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/io/TestHeapSize.java Mon Aug 17 22:21:00 2009 @@ -6,6 +6,8 @@ import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -164,6 +166,25 @@ assertEquals(expected, actual); } + // CopyOnWriteArraySet + cl = CopyOnWriteArraySet.class; + expected = ClassSize.estimateBase(cl, false); + actual = ClassSize.COPYONWRITE_ARRAYSET; + if(expected != actual) { + ClassSize.estimateBase(cl, true); + assertEquals(expected, actual); + } + + // CopyOnWriteArrayList + cl = CopyOnWriteArrayList.class; + expected = ClassSize.estimateBase(cl, false); + actual = ClassSize.COPYONWRITE_ARRAYLIST; + if(expected != actual) { + ClassSize.estimateBase(cl, true); + assertEquals(expected, actual); + } + + } /** @@ -240,11 +261,15 @@ expected += ClassSize.estimateBase(AtomicLong.class, false); expected += ClassSize.estimateBase(ConcurrentSkipListMap.class, false); expected += ClassSize.estimateBase(ConcurrentSkipListMap.class, false); + expected += ClassSize.estimateBase(CopyOnWriteArraySet.class, false); + expected += ClassSize.estimateBase(CopyOnWriteArrayList.class, false); if(expected != actual) { ClassSize.estimateBase(cl, true); ClassSize.estimateBase(ReentrantReadWriteLock.class, true); ClassSize.estimateBase(AtomicLong.class, true); ClassSize.estimateBase(ConcurrentSkipListMap.class, true); + ClassSize.estimateBase(CopyOnWriteArraySet.class, true); + ClassSize.estimateBase(CopyOnWriteArrayList.class, true); assertEquals(expected, actual); } Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestMemStore.java URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestMemStore.java?rev=805183&r1=805182&r2=805183&view=diff ============================================================================== --- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestMemStore.java (original) +++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestMemStore.java Mon Aug 17 22:21:00 2009 @@ -90,6 +90,10 @@ s.close(); } assertEquals(rowCount, count); + for (int i = 0; i < memstorescanners.length; i++) { + memstorescanners[0].close(); + } + memstorescanners = this.memstore.getScanners(); // Now assert can count same number even if a snapshot mid-scan. s = new StoreScanner(scan, null, HConstants.LATEST_TIMESTAMP, this.memstore.comparator, null, memstorescanners); @@ -112,6 +116,10 @@ s.close(); } assertEquals(rowCount, count); + for (int i = 0; i < memstorescanners.length; i++) { + memstorescanners[0].close(); + } + memstorescanners = this.memstore.getScanners(); // Assert that new values are seen in kvset as we scan. long ts = System.currentTimeMillis(); s = new StoreScanner(scan, null, HConstants.LATEST_TIMESTAMP, @@ -124,8 +132,7 @@ // Assert the stuff is coming out in right order. assertTrue(Bytes.compareTo(Bytes.toBytes(count), result.get(0).getRow()) == 0); // Row count is same as column count. - // TODO PUTBACK assertEquals("count=" + count + ", result=" + result, - // rowCount, result.size()); + assertEquals("count=" + count + ", result=" + result, rowCount, result.size()); count++; if (count == snapshotIndex) { this.memstore.snapshot(); @@ -407,8 +414,7 @@ assertEquals(expected.get(i), result.get(i)); } } - - + ////////////////////////////////////////////////////////////////////////////// // Delete tests ////////////////////////////////////////////////////////////////////////////// @@ -637,4 +643,4 @@ return new KeyValue(row, Bytes.toBytes("test_col:"), HConstants.LATEST_TIMESTAMP, value); } -} \ No newline at end of file +}