hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r805183 - in /hadoop/hbase/trunk: ./ src/java/org/apache/hadoop/hbase/regionserver/ src/java/org/apache/hadoop/hbase/util/ src/test/org/apache/hadoop/hbase/io/ src/test/org/apache/hadoop/hbase/regionserver/
Date Mon, 17 Aug 2009 22:21:00 GMT
Author: stack
Date: Mon Aug 17 22:21:00 2009
New Revision: 805183

URL: http://svn.apache.org/viewvc?rev=805183&view=rev
Log:
HBASE-1738 Scanner doesnt reset when a snapshot is created, could miss new updates into the
'kvset' (active part)

Modified:
    hadoop/hbase/trunk/CHANGES.txt
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/ClassSize.java
    hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/io/TestHeapSize.java
    hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestMemStore.java

Modified: hadoop/hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=805183&r1=805182&r2=805183&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Mon Aug 17 22:21:00 2009
@@ -337,6 +337,8 @@
                storefile problems
    HBASE-1761  getclosest doesn't understand delete family; manifests as
                "HRegionInfo was null or empty in .META" A.K.A the BS problem
+   HBASE-1738  Scanner doesnt reset when a snapshot is created, could miss
+               new updates into the 'kvset' (active part)
 
   IMPROVEMENTS
    HBASE-1089  Add count of regions on filesystem to master UI; add percentage

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java?rev=805183&r1=805182&r2=805183&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java Mon Aug
17 22:21:00 2009
@@ -28,7 +28,9 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.NavigableSet;
+import java.util.Set;
 import java.util.SortedSet;
+import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -78,6 +80,10 @@
   // Used to track own heapSize
   final AtomicLong size;
 
+  // All access must be synchronized.
+  final CopyOnWriteArraySet<ChangedMemStoreObserver> changedMemStoreObservers =
+    new CopyOnWriteArraySet<ChangedMemStoreObserver>();
+
   /**
    * Default constructor. Used for tests.
    */
@@ -123,12 +129,10 @@
         LOG.warn("Snapshot called again without clearing previous. " +
           "Doing nothing. Another ongoing flush or did we fail last attempt?");
       } else {
-        // We used to synchronize on the memstore here but we're inside a
-        // write lock so removed it. Comment is left in case removal was a
-        // mistake. St.Ack
         if (!this.kvset.isEmpty()) {
           this.snapshot = this.kvset;
           this.kvset = new KeyValueSkipListSet(this.comparator);
+          tellChangedMemStoreObservers();
           // Reset heap to not include any keys
           this.size.set(DEEP_OVERHEAD);
         }
@@ -138,6 +142,15 @@
     }
   }
 
+  /*
+   * Tell outstanding scanners that memstore has changed.
+   */
+  private void tellChangedMemStoreObservers() {
+    for (ChangedMemStoreObserver o: this.changedMemStoreObservers) {
+      o.changedMemStore();
+    }
+  }
+
   /**
    * Return the current snapshot.
    * Called by flusher to get current snapshot made by a previous
@@ -168,6 +181,7 @@
       // create a new snapshot and let the old one go.
       if (!ss.isEmpty()) {
         this.snapshot = new KeyValueSkipListSet(this.comparator);
+        tellChangedMemStoreObservers();
       }
     } finally {
       this.lock.writeLock().unlock();
@@ -445,9 +459,8 @@
   KeyValueScanner [] getScanners() {
     this.lock.readLock().lock();
     try {
-      KeyValueScanner [] scanners = new KeyValueScanner[2];
-      scanners[0] = new MemStoreScanner(this.kvset);
-      scanners[1] = new MemStoreScanner(this.snapshot);
+      KeyValueScanner [] scanners = new KeyValueScanner[1];
+      scanners[0] = new MemStoreScanner(this.changedMemStoreObservers);
       return scanners;
     } finally {
       this.lock.readLock().unlock();
@@ -521,18 +534,22 @@
 
   /*
    * MemStoreScanner implements the KeyValueScanner.
-   * It lets the caller scan the contents of a memstore.
-   * This behaves as if it were a real scanner but does not maintain position
-   * in the passed memstore tree.
-   */
-  protected class MemStoreScanner implements KeyValueScanner {
-    private final NavigableSet<KeyValue> kvs;
-    private KeyValue current = null;
+   * It lets the caller scan the contents of a memstore -- both current
+   * map and snapshot.
+   * This behaves as if it were a real scanner but does not maintain position.
+   */
+  protected class MemStoreScanner implements KeyValueScanner, ChangedMemStoreObserver {
     private List<KeyValue> result = new ArrayList<KeyValue>();
     private int idx = 0;
-
-    MemStoreScanner(final NavigableSet<KeyValue> s) {
-      this.kvs = s;
+    // Make access atomic.
+    private FirstOnRow firstOnNextRow = new FirstOnRow();
+    // Keep reference to Set so can remove myself when closed.
+    private final Set<ChangedMemStoreObserver> observers;
+
+    MemStoreScanner(final Set<ChangedMemStoreObserver> observers) {
+      super();
+      this.observers = observers;
+      this.observers.add(this);
     }
 
     public boolean seek(KeyValue key) {
@@ -541,7 +558,7 @@
           close();
           return false;
         }
-        this.current = key;
+        this.firstOnNextRow.set(key);
         return cacheNextRow();
       } catch(Exception e) {
         close();
@@ -570,47 +587,117 @@
     }
 
     /**
-     * @return True if we successfully cached a NavigableSet aligned on
-     * next row.
+     * @return True if successfully cached a next row.
      */
     boolean cacheNextRow() {
-      SortedSet<KeyValue> keys;
+      // Prevent snapshot being cleared while caching a row.
+      lock.readLock().lock();
+      this.result.clear();
+      this.idx = 0;
       try {
-        keys = this.kvs.tailSet(this.current);
-      } catch (Exception e) {
-        close();
-        return false;
-      }
-      if (keys == null || keys.isEmpty()) {
-        close();
-        return false;
+        // Look at each set, kvset and snapshot.
+        // Both look for matching entries for this.current row returning what
+        // they
+        // have as next row after this.current (or null if nothing in set or if
+        // nothing follows.
+        KeyValue kvsetNextRow = cacheNextRow(kvset);
+        KeyValue snapshotNextRow = cacheNextRow(snapshot);
+        if (kvsetNextRow == null && snapshotNextRow == null) {
+          // Nothing more in memstore but we might have gotten current row
+          // results
+          // Indicate at end of store by setting next row to null.
+          this.firstOnNextRow.set(null);
+          return !this.result.isEmpty();
+        } else if (kvsetNextRow != null && snapshotNextRow != null) {
+          // Set current at the lowest of the two values.
+          int compare = comparator.compare(kvsetNextRow, snapshotNextRow);
+          this.firstOnNextRow.set(compare <= 0? kvsetNextRow: snapshotNextRow);
+        } else {
+          this.firstOnNextRow.set(kvsetNextRow != null? kvsetNextRow: snapshotNextRow);
+        }
+        return true;
+      } finally {
+        lock.readLock().unlock();
       }
-      this.current = null;
-      byte [] row = keys.first().getRow();
-      for (KeyValue kv: keys) {
-        if (comparator.compareRows(kv, row) != 0) {
-          this.current = kv;
+    }
+
+    /*
+     * See if set has entries for the <code>this.current</code> row.  If so,
+     * add them to <code>this.result</code>.
+     * @param set Set to examine
+     * @return Next row in passed <code>set</code> or null if nothing in this
+     * passed <code>set</code>
+     */
+    private KeyValue cacheNextRow(final NavigableSet<KeyValue> set) {
+      if (this.firstOnNextRow.get() == null || set.isEmpty()) return null;
+      SortedSet<KeyValue> tail = set.tailSet(this.firstOnNextRow.get());
+      if (tail == null || tail.isEmpty()) return null;
+      KeyValue first = tail.first();
+      KeyValue nextRow = null;
+      for (KeyValue kv: tail) {
+        if (comparator.compareRows(first, kv) != 0) {
+          nextRow = kv;
           break;
         }
-        result.add(kv);
+        this.result.add(kv);
       }
-      return true;
+      return nextRow;
     }
 
     public void close() {
-      current = null;
+      this.firstOnNextRow.set(null);
       idx = 0;
       if (!result.isEmpty()) {
         result.clear();
       }
+      this.observers.remove(this);
+    }
+
+    public void changedMemStore() {
+      this.firstOnNextRow.reset();
     }
   }
-  
+
+  /*
+   * Private class that holds firstOnRow and utility.
+   * Usually firstOnRow is the first KeyValue we find on next row rather than
+   * the absolute minimal first key (empty column, Type.Maximum, maximum ts).
+   * Usually its ok being sloppy with firstOnRow letting it be the first thing
+   * found on next row -- this works -- but if the memstore changes on us, reset
+   * firstOnRow to be the ultimate firstOnRow.  We play sloppy with firstOnRow
+   * usually so we don't have to  allocate a new KeyValue each time firstOnRow
+   * is updated.
+   */
+  private static class FirstOnRow {
+    private KeyValue firstOnRow = null;
+
+    FirstOnRow() {
+      super();
+    }
+
+    synchronized void set(final KeyValue kv) {
+      this.firstOnRow = kv;
+    }
+
+    /* Reset firstOnRow to a 'clean', absolute firstOnRow.
+     */
+    synchronized void reset() {
+      if (this.firstOnRow == null) return;
+      this.firstOnRow =
+         new KeyValue(this.firstOnRow.getRow(), HConstants.LATEST_TIMESTAMP);
+    }
+
+    synchronized KeyValue get() {
+      return this.firstOnRow;
+    }
+  }
+
   public final static long FIXED_OVERHEAD = ClassSize.align(
-      ClassSize.OBJECT + (7 * ClassSize.REFERENCE));
+      ClassSize.OBJECT + (8 * ClassSize.REFERENCE));
   
   public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
       ClassSize.REENTRANT_LOCK + ClassSize.ATOMIC_LONG +
+      ClassSize.COPYONWRITE_ARRAYSET + ClassSize.COPYONWRITE_ARRAYLIST +
       (2 * ClassSize.CONCURRENT_SKIPLISTMAP));
 
   /*
@@ -682,4 +769,16 @@
     }
     LOG.info("Exiting.");
   }
-}
+
+  /**
+   * Observers want to know about MemStore changes.
+   * Called when snapshot is cleared and when we make one.
+   */
+  interface ChangedMemStoreObserver {
+    /**
+     * Notify observers.
+     * @throws IOException
+     */
+    void changedMemStore();
+  }
+}
\ No newline at end of file

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/ClassSize.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/ClassSize.java?rev=805183&r1=805182&r2=805183&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/ClassSize.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/ClassSize.java Mon Aug 17 22:21:00
2009
@@ -92,6 +92,12 @@
   /** Overhead for AtomicBoolean */
   public static int ATOMIC_BOOLEAN = 0;
   
+  /** Overhead for CopyOnWriteArraySet */
+  public static int COPYONWRITE_ARRAYSET = 0;
+  
+  /** Overhead for CopyOnWriteArrayList */
+  public static int COPYONWRITE_ARRAYLIST = 0;
+  
   private static final String THIRTY_TWO = "32";
 
   /**
@@ -151,6 +157,9 @@
     
     ATOMIC_BOOLEAN = align(OBJECT + Bytes.SIZEOF_BOOLEAN);
     
+    COPYONWRITE_ARRAYSET = align(OBJECT + REFERENCE);
+    
+    COPYONWRITE_ARRAYLIST = align(OBJECT + (2 * REFERENCE) + ARRAY);
   }
   
   /**

Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/io/TestHeapSize.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/io/TestHeapSize.java?rev=805183&r1=805182&r2=805183&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/io/TestHeapSize.java (original)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/io/TestHeapSize.java Mon Aug 17 22:21:00
2009
@@ -6,6 +6,8 @@
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -164,6 +166,25 @@
       assertEquals(expected, actual);
     }
     
+    // CopyOnWriteArraySet
+    cl = CopyOnWriteArraySet.class;
+    expected = ClassSize.estimateBase(cl, false);
+    actual = ClassSize.COPYONWRITE_ARRAYSET;
+    if(expected != actual) {
+      ClassSize.estimateBase(cl, true);
+      assertEquals(expected, actual);
+    }
+    
+    // CopyOnWriteArrayList
+    cl = CopyOnWriteArrayList.class;
+    expected = ClassSize.estimateBase(cl, false);
+    actual = ClassSize.COPYONWRITE_ARRAYLIST;
+    if(expected != actual) {
+      ClassSize.estimateBase(cl, true);
+      assertEquals(expected, actual);
+    }
+    
+    
   }
   
   /**
@@ -240,11 +261,15 @@
     expected += ClassSize.estimateBase(AtomicLong.class, false);
     expected += ClassSize.estimateBase(ConcurrentSkipListMap.class, false);
     expected += ClassSize.estimateBase(ConcurrentSkipListMap.class, false);
+    expected += ClassSize.estimateBase(CopyOnWriteArraySet.class, false);
+    expected += ClassSize.estimateBase(CopyOnWriteArrayList.class, false);
     if(expected != actual) {
       ClassSize.estimateBase(cl, true);
       ClassSize.estimateBase(ReentrantReadWriteLock.class, true);
       ClassSize.estimateBase(AtomicLong.class, true);
       ClassSize.estimateBase(ConcurrentSkipListMap.class, true);
+      ClassSize.estimateBase(CopyOnWriteArraySet.class, true);
+      ClassSize.estimateBase(CopyOnWriteArrayList.class, true);
       assertEquals(expected, actual);
     }
     

Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestMemStore.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestMemStore.java?rev=805183&r1=805182&r2=805183&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestMemStore.java (original)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestMemStore.java Mon
Aug 17 22:21:00 2009
@@ -90,6 +90,10 @@
       s.close();
     }
     assertEquals(rowCount, count);
+    for (int i = 0; i < memstorescanners.length; i++) {
+      memstorescanners[0].close();
+    }
+    memstorescanners = this.memstore.getScanners();
     // Now assert can count same number even if a snapshot mid-scan.
     s = new StoreScanner(scan, null, HConstants.LATEST_TIMESTAMP,
       this.memstore.comparator, null, memstorescanners);
@@ -112,6 +116,10 @@
       s.close();
     }
     assertEquals(rowCount, count);
+    for (int i = 0; i < memstorescanners.length; i++) {
+      memstorescanners[0].close();
+    }
+    memstorescanners = this.memstore.getScanners();
     // Assert that new values are seen in kvset as we scan.
     long ts = System.currentTimeMillis();
     s = new StoreScanner(scan, null, HConstants.LATEST_TIMESTAMP,
@@ -124,8 +132,7 @@
         // Assert the stuff is coming out in right order.
         assertTrue(Bytes.compareTo(Bytes.toBytes(count), result.get(0).getRow()) == 0);
         // Row count is same as column count.
-        // TODO PUTBACK assertEquals("count=" + count + ", result=" + result,
-        //  rowCount, result.size());
+        assertEquals("count=" + count + ", result=" + result, rowCount, result.size());
         count++;
         if (count == snapshotIndex) {
           this.memstore.snapshot();
@@ -407,8 +414,7 @@
       assertEquals(expected.get(i), result.get(i));
     }
   }
-  
-  
+
   //////////////////////////////////////////////////////////////////////////////
   // Delete tests
   //////////////////////////////////////////////////////////////////////////////
@@ -637,4 +643,4 @@
     return new KeyValue(row, Bytes.toBytes("test_col:"),
       HConstants.LATEST_TIMESTAMP, value);
   }
-}
\ No newline at end of file
+}



Mime
View raw message