hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r651017 - in /hadoop/hbase/trunk/src: java/org/apache/hadoop/hbase/regionserver/ test/org/apache/hadoop/hbase/
Date Wed, 23 Apr 2008 19:21:31 GMT
Author: stack
Date: Wed Apr 23 12:21:26 2008
New Revision: 651017

URL: http://svn.apache.org/viewvc?rev=651017&view=rev
Log:
HBASE-588 Still a 'hole' in scanners, even after HBASE-532

Add a ChangedReadersObserver interface.  HStore notifies registered
observers when list of HStoreFile Readers changes -- at flush time
and at compaction time.  Scanners are only current observers.

Also fix a deadlock flushing by changing lock types and moving flush
request out from under lock.

M  src/test/org/apache/hadoop/hbase/MultiRegionTable.java
    (getCacheFlushListern): Renamed as getFlushRequester
M  src/java/org/apache/hadoop/hbase/regionserver/HStore.java
    Added synchronizations on this.storefiles.
    (activeScanners, newScannerLock): Removed.  We no longer try to
    block out scanners when compacting (Turns out same functionality
    for blocking scanners is up in HRegion used around closing/split).
    (changedReadersObservers): Added.
    (updateReaders): New method that manages the insertion of new
    reader on flush.  Also calls new notifyChangedReadersObservers.
    (notifyChagnedReadersObservers, addChangedReadersObserver,
      deleteChangedReadersObservers): Added.
    (completeCompaction): Previous deleting old store files, we'd
    remove from this.storefiles and delete all in one step; now we
    do the remove first, notify all observers of readers, and then
    do the delete so observers have a chance to clean up any old
    references to files about to be deleted.  Removed all the lockout
    of new scanner creation and wait on old scanners to come int.
    (updateActiveScanners): Removed.
    (getStorefiles): Accessor.  Added.
M  src/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
    Added implementation of new ChangedReadersObserver interface.
    Added a lock that we hold when 'nexting' and when changing the
    set of readers out from under the scanner.
    Changed the constructor moving bulk into new openReaders method
    that we reuse when list of Readers changes.
    (next): Moved head of this method into new getNextViableRow (used
    to be called chosenRow and chosenTimestamp).  New method returns
    simple datastructure of row and timestamp (ViableRow).
    (close): On close, remove ourselves as ChangedReadersObserver (we
    added ourselves in the constructor).
    (updateReaders): Changed the set of Readers out from under the
    Scanner.
A  src/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java
    Added.
M  src/java/org/apache/hadoop/hbase/regionserver/Flusher.java
    Changed name of the interface we implement from CacheFlushListener to
    FlushRequester.
D  src/java/org/apache/hadoop/hbase/regionserver/CacheFlushListener.java
    Renamed as FlushRequester.
M  src/java/org/apache/hadoop/hbase/regionserver/HStoreScanner.java
    Remove update of activeScanners.
A  src/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java
    Added.  Rename of CacheFlushListener.
M  src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    Renamed method getCacheFlushListener as getFlushRequester.
M src/java/org/apache/hadoop/hbase/regionserver/HAbstractScanner.java
    Formatting.
M  src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    CacheFlushListener was renamed as FlushListener.

Added:
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java
Removed:
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/CacheFlushListener.java
Modified:
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Flusher.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HAbstractScanner.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStore.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStoreScanner.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
    hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/MultiRegionTable.java

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java?rev=651017&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java Wed Apr 23 12:21:26 2008
@@ -0,0 +1,34 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+
+
+/**
+ * If set of MapFile.Readers in Store change, implementors are notified.
+ */
+public interface ChangedReadersObserver {
+  /**
+   * Notify observers.
+   */
+  void updateReaders() throws IOException;
+}
\ No newline at end of file

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java?rev=651017&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java Wed Apr 23 12:21:26 2008
@@ -0,0 +1,36 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+/**
+ * Implementors of this interface want to be notified when an HRegion
+ * determines that a cache flush is needed. A FlushRequester (or null)
+ * must be passed to the HRegion constructor so it knows who to call when it
+ * has a filled memcache.
+ */
+public interface FlushRequester {
+  /**
+   * Tell the listener the cache needs to be flushed.
+   * 
+   * @param region the HRegion requesting the cache flush
+   */
+  void request(HRegion region);
+}
\ No newline at end of file

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Flusher.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Flusher.java?rev=651017&r1=651016&r2=651017&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Flusher.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Flusher.java Wed Apr 23 12:21:26 2008
@@ -38,8 +38,11 @@
 import org.apache.hadoop.hbase.DroppedSnapshotException;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
 
-/** Flush cache upon request */
-class Flusher extends Thread implements CacheFlushListener {
+/**
+ * Thread that flushes cache on request
+ * @see FlushRequester
+ */
+class Flusher extends Thread implements FlushRequester {
   static final Log LOG = LogFactory.getLog(Flusher.class);
   private final BlockingQueue<HRegion> flushQueue =
     new LinkedBlockingQueue<HRegion>();
@@ -110,13 +113,8 @@
   }
   
   /** {@inheritDoc} */
-  public void flushRequested(HRegion r) {
-    synchronized (regionsInQueue) {
-      if (!regionsInQueue.contains(r)) {
-        regionsInQueue.add(r);
-        flushQueue.add(r);
-      }
-    }
+  public void request(HRegion r) {
+    addRegion(r, System.currentTimeMillis());
   }
   
   /**
@@ -204,18 +202,41 @@
       // Queue up regions for optional flush if they need it
       Set<HRegion> regions = server.getRegionsToCheck();
       for (HRegion region: regions) {
-        synchronized (regionsInQueue) {
-          if (!regionsInQueue.contains(region) &&
-              (now - optionalFlushPeriod) > region.getLastFlushTime()) {
-            regionsInQueue.add(region);
-            flushQueue.add(region);
-            region.setLastFlushTime(now);
-          }
-        }
+        optionallyAddRegion(region, now);
+      }
+    }
+  }
+
+  /*
+   * Add region if not already added and if optional flush period has been
+   * exceeded.
+   * @param r Region to add.
+   * @param now The 'now' to use.  Set last flush time to this value.
+   */
+  private void optionallyAddRegion(final HRegion r, final long now) {
+    synchronized (regionsInQueue) {
+      if (!regionsInQueue.contains(r) &&
+          (now - optionalFlushPeriod) > r.getLastFlushTime()) {
+        addRegion(r, now);
       }
     }
   }
   
+  /*
+   * Add region if not already added.
+   * @param r Region to add.
+   * @param now The 'now' to use.  Set last flush time to this value.
+   */
+  private void addRegion(final HRegion r, final long now) {
+    synchronized (regionsInQueue) {
+      if (!regionsInQueue.contains(r)) {
+        regionsInQueue.add(r);
+        flushQueue.add(r);
+        r.setLastFlushTime(now);
+      }
+    }
+  }
+
   /**
    * Check if the regionserver's memcache memory usage is greater than the 
    * limit. If so, flush regions with the biggest memcaches until we're down

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HAbstractScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HAbstractScanner.java?rev=651017&r1=651016&r2=651017&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HAbstractScanner.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HAbstractScanner.java Wed Apr 23 12:21:26 2008
@@ -107,10 +107,15 @@
     }
   }
 
-  protected TreeMap<Text, Vector<ColumnMatcher>> okCols;        // Holds matchers for each column family 
+  // Holds matchers for each column family 
+  protected TreeMap<Text, Vector<ColumnMatcher>> okCols;
+  
+  // True when scanning is done
+  protected volatile boolean scannerClosed = false;
+  
+  // The timestamp to match entries against
+  protected long timestamp;
   
-  protected boolean scannerClosed = false;                      // True when scanning is done
-  protected long timestamp;                                     // The timestamp to match entries against
   private boolean wildcardMatch;
   private boolean multipleMatchers;
 

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=651017&r1=651016&r2=651017&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java Wed Apr 23 12:21:26 2008
@@ -63,6 +63,7 @@
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.DroppedSnapshotException;
+import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.WrongRegionException;
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
 
@@ -341,18 +342,22 @@
     volatile boolean writesEnabled = true;
   }
 
-  volatile WriteState writestate = new WriteState();
+  private volatile WriteState writestate = new WriteState();
 
   final int memcacheFlushSize;
   private volatile long lastFlushTime;
-  final CacheFlushListener flushListener;
-  final int blockingMemcacheSize;
-  protected final long threadWakeFrequency;
-  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
-  private final Integer updateLock = new Integer(0);
+  final FlushRequester flushListener;
+  private final int blockingMemcacheSize;
+  final long threadWakeFrequency;
+  // Used to guard splits and closes
+  private final ReentrantReadWriteLock splitsAndClosesLock =
+    new ReentrantReadWriteLock();
+  // Stop updates lock
+  private final ReentrantReadWriteLock updatesLock =
+    new ReentrantReadWriteLock();
   private final Integer splitLock = new Integer(0);
   private final long minSequenceId;
-  final AtomicInteger activeScannerCount = new AtomicInteger(0);
+  private final AtomicInteger activeScannerCount = new AtomicInteger(0);
 
   //////////////////////////////////////////////////////////////////////////////
   // Constructor
@@ -381,7 +386,7 @@
    */
   public HRegion(Path basedir, HLog log, FileSystem fs, HBaseConfiguration conf, 
       HRegionInfo regionInfo, Path initialFiles,
-      CacheFlushListener flushListener) throws IOException {
+      FlushRequester flushListener) throws IOException {
     this(basedir, log, fs, conf, regionInfo, initialFiles, flushListener, null);
   }
   
@@ -410,7 +415,7 @@
    */
   public HRegion(Path basedir, HLog log, FileSystem fs, HBaseConfiguration conf, 
       HRegionInfo regionInfo, Path initialFiles,
-      CacheFlushListener flushListener, final Progressable reporter)
+      FlushRequester flushListener, final Progressable reporter)
     throws IOException {
     
     this.basedir = basedir;
@@ -566,20 +571,17 @@
           }
         }
       }
-      lock.writeLock().lock();
-      LOG.debug("new updates and scanners for region " + regionName +
-          " disabled");
-      
+      splitsAndClosesLock.writeLock().lock();
+      LOG.debug("Updates and scanners for region " + regionName + " disabled");
       try {
-        // Wait for active scanners to finish. The write lock we hold will prevent
-        // new scanners from being created.
+        // Wait for active scanners to finish. The write lock we hold will
+        // prevent new scanners from being created.
         synchronized (activeScannerCount) {
           while (activeScannerCount.get() != 0) {
             LOG.debug("waiting for " + activeScannerCount.get() +
                 " scanners to finish");
             try {
               activeScannerCount.wait();
-
             } catch (InterruptedException e) {
               // continue
             }
@@ -620,7 +622,7 @@
         LOG.info("closed " + this.regionInfo.getRegionName());
         return result;
       } finally {
-        lock.writeLock().unlock();
+        splitsAndClosesLock.writeLock().unlock();
       }
     }
   }
@@ -868,10 +870,8 @@
         }
       }
       doRegionCompactionCleanup();
-      LOG.info("compaction completed on region " + getRegionName() +
-          ". Took " +
-          StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime));
-      
+      LOG.info("compaction completed on region " + getRegionName() + " in " +
+        StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime));
     } finally {
       synchronized (writestate) {
         writestate.compacting = false;
@@ -919,11 +919,12 @@
       }
     }
     try {
-      lock.readLock().lock();                      // Prevent splits and closes
+      // Prevent splits and closes
+      splitsAndClosesLock.readLock().lock();
       try {
         return internalFlushcache();
       } finally {
-        lock.readLock().unlock();
+        splitsAndClosesLock.readLock().unlock();
       }
     } finally {
       synchronized (writestate) {
@@ -984,10 +985,13 @@
     // to do this for a moment.  Its quick.  The subsequent sequence id that
     // goes into the HLog after we've flushed all these snapshots also goes
     // into the info file that sits beside the flushed files.
-    synchronized (updateLock) {
+    this.updatesLock.writeLock().lock();
+    try {
       for (HStore s: stores.values()) {
         s.snapshot();
       }
+    } finally {
+      this.updatesLock.writeLock().unlock();
     }
     long sequenceId = log.startCacheFlush();
 
@@ -1150,7 +1154,7 @@
     
     HStoreKey key = null;
     checkRow(row);
-    lock.readLock().lock();
+    splitsAndClosesLock.readLock().lock();
     try {
       // examine each column family for the preceeding or matching key
       for(Text colFamily : stores.keySet()){
@@ -1188,7 +1192,7 @@
       
       return new RowResult(key.getRow(), cellsWritten);
     } finally {
-      lock.readLock().unlock();
+      splitsAndClosesLock.readLock().unlock();
     }
   }
 
@@ -1235,7 +1239,7 @@
   public InternalScanner getScanner(Text[] cols, Text firstRow,
     long timestamp, RowFilterInterface filter) 
   throws IOException {
-    lock.readLock().lock();
+    splitsAndClosesLock.readLock().lock();
     try {
       if (this.closed.get()) {
         throw new IOException("Region " + this.getRegionName().toString() +
@@ -1257,7 +1261,7 @@
       return new HScanner(cols, firstRow, timestamp,
         storelist.toArray(new HStore [storelist.size()]), filter);
     } finally {
-      lock.readLock().unlock();
+      splitsAndClosesLock.readLock().unlock();
     }
   }
 
@@ -1506,15 +1510,15 @@
    * @throws IOException
    */
   private void update(final TreeMap<HStoreKey, byte []> updatesByColumn)
-    throws IOException {
-    
+  throws IOException {
     if (updatesByColumn == null || updatesByColumn.size() <= 0) {
       return;
     }
-    synchronized (updateLock) {                         // prevent a cache flush
+    boolean flush = false;
+    this.updatesLock.readLock().lock();
+    try {
       this.log.append(regionInfo.getRegionName(),
-          regionInfo.getTableDesc().getName(), updatesByColumn);
-
+        regionInfo.getTableDesc().getName(), updatesByColumn);
       long size = 0;
       for (Map.Entry<HStoreKey, byte[]> e: updatesByColumn.entrySet()) {
         HStoreKey key = e.getKey();
@@ -1522,12 +1526,15 @@
         size = this.memcacheSize.addAndGet(getEntrySize(key, val));
         stores.get(HStoreKey.extractFamily(key.getColumn())).add(key, val);
       }
-      if (this.flushListener != null && !this.flushRequested &&
-          size > this.memcacheFlushSize) {
-        // Request a cache flush
-        this.flushListener.flushRequested(this);
-        this.flushRequested = true;
-      }
+      flush = this.flushListener != null && !this.flushRequested &&
+        size > this.memcacheFlushSize;
+    } finally {
+      this.updatesLock.readLock().unlock();
+    }
+    if (flush) {
+      // Request a cache flush.  Do it outside update lock.
+      this.flushListener.request(this);
+      this.flushRequested = true;
     }
   }
   
@@ -1597,11 +1604,11 @@
    */
   long obtainRowLock(Text row) throws IOException {
     checkRow(row);
-    lock.readLock().lock();
+    splitsAndClosesLock.readLock().lock();
     try {
       if (this.closed.get()) {
-        throw new IOException("Region " + this.getRegionName().toString() +
-          " closed");
+        throw new NotServingRegionException("Region " +
+          this.getRegionName().toString() + " closed");
       }
       synchronized (rowsToLocks) {
         while (rowsToLocks.get(row) != null) {
@@ -1618,7 +1625,7 @@
         return lid.longValue();
       }
     } finally {
-      lock.readLock().unlock();
+      splitsAndClosesLock.readLock().unlock();
     }
   }
   

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=651017&r1=651016&r2=651017&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Wed Apr 23 12:21:26 2008
@@ -1264,8 +1264,8 @@
     return this.requestCount;
   }
 
-  /** @return reference to CacheFlushListener */
-  public CacheFlushListener getCacheFlushListener() {
+  /** @return reference to FlushRequester */
+  public FlushRequester getFlushRequester() {
     return this.cacheFlusher;
   }
   

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStore.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=651017&r1=651016&r2=651017&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStore.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStore.java Wed Apr 23 12:21:26 2008
@@ -24,12 +24,12 @@
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -41,30 +41,29 @@
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.BloomFilterDescriptor;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HStoreKey;
+import org.apache.hadoop.hbase.RemoteExceptionHandler;
 import org.apache.hadoop.hbase.filter.RowFilterInterface;
+import org.apache.hadoop.hbase.io.Cell;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.io.TextSequence;
+import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.io.MapFile;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.hbase.BloomFilterDescriptor;
 import org.onelab.filter.BloomFilter;
 import org.onelab.filter.CountingBloomFilter;
 import org.onelab.filter.Filter;
 import org.onelab.filter.RetouchedBloomFilter;
 
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HStoreKey;
-import org.apache.hadoop.hbase.RemoteExceptionHandler;
-import org.apache.hadoop.hbase.io.Cell;
-import org.apache.hadoop.hbase.util.FSUtils;
-
 /**
  * HStore maintains a bunch of data files.  It is responsible for maintaining 
  * the memory/file hierarchy and for periodic flushes to disk and compacting 
@@ -102,7 +101,6 @@
   private final Integer flushLock = new Integer(0);
 
   private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
-  final AtomicInteger activeScanners = new AtomicInteger(0);
 
   final Text storeName;
 
@@ -110,7 +108,7 @@
    * Sorted Map of readers keyed by sequence id (Most recent should be last in
    * in list).
    */
-  final SortedMap<Long, HStoreFile> storefiles =
+  private final SortedMap<Long, HStoreFile> storefiles =
     Collections.synchronizedSortedMap(new TreeMap<Long, HStoreFile>());
   
   /*
@@ -128,9 +126,8 @@
   private final Path compactionDir;
   private final Integer compactLock = new Integer(0);
   private final int compactionThreshold;
-  
-  private final ReentrantReadWriteLock newScannerLock =
-    new ReentrantReadWriteLock();
+  private final Set<ChangedReadersObserver> changedReaderObservers =
+    Collections.synchronizedSet(new HashSet<ChangedReadersObserver>());
 
   /**
    * An HStore is a set of zero or more MapFiles, which stretch backwards over 
@@ -563,7 +560,9 @@
       for (MapFile.Reader reader: this.readers.values()) {
         reader.close();
       }
-      result = new ArrayList<HStoreFile>(storefiles.values());
+      synchronized (this.storefiles) {
+        result = new ArrayList<HStoreFile>(storefiles.values());
+      }
       LOG.debug("closed " + this.storeName);
       return result;
     } finally {
@@ -659,26 +658,68 @@
       }
 
       // D. Finally, make the new MapFile available.
-      this.lock.writeLock().lock();
-      try {
-        Long flushid = Long.valueOf(logCacheFlushId);
-        // Open the map file reader.
-        this.readers.put(flushid,
-            flushedFile.getReader(this.fs, this.bloomFilter));
-        this.storefiles.put(flushid, flushedFile);
-        if(LOG.isDebugEnabled()) {
-          LOG.debug("Added " + FSUtils.getPath(flushedFile.getMapFilePath()) +
-            " with " + entries +
-            " entries, sequence id " + logCacheFlushId + ", data size " +
-            StringUtils.humanReadableInt(flushed) + ", file size " +
-            StringUtils.humanReadableInt(newStoreSize));
-        }
-      } finally {
-        this.lock.writeLock().unlock();
+      updateReaders(logCacheFlushId, flushedFile);
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("Added " + FSUtils.getPath(flushedFile.getMapFilePath()) +
+          " with " + entries +
+          " entries, sequence id " + logCacheFlushId + ", data size " +
+          StringUtils.humanReadableInt(flushed) + ", file size " +
+          StringUtils.humanReadableInt(newStoreSize));
       }
     }
     return flushed;
   }
+  
+  /*
+   * Change readers adding into place the Reader produced by this new flush.
+   * @param logCacheFlushId
+   * @param flushedFile
+   * @throws IOException
+   */
+  private void updateReaders(final long logCacheFlushId,
+      final HStoreFile flushedFile)
+  throws IOException {
+    this.lock.writeLock().lock();
+    try {
+      Long flushid = Long.valueOf(logCacheFlushId);
+      // Open the map file reader.
+      this.readers.put(flushid,
+        flushedFile.getReader(this.fs, this.bloomFilter));
+      this.storefiles.put(flushid, flushedFile);
+      // Tell listeners of the change in readers.
+      notifyChangedReadersObservers();
+    } finally {
+      this.lock.writeLock().unlock();
+    }
+  }
+  
+  /*
+   * Notify all observers that set of Readers has changed.
+   * @throws IOException
+   */
+  private void notifyChangedReadersObservers() throws IOException {
+    synchronized (this.changedReaderObservers) {
+      for (ChangedReadersObserver o: this.changedReaderObservers) {
+        o.updateReaders();
+      }
+    }
+  }
+  
+  /*
+   * @param o Observer who wants to know about changes in set of Readers
+   */
+  void addChangedReaderObserver(ChangedReadersObserver o) {
+    this.changedReaderObservers.add(o);
+  }
+  
+  /*
+   * @param o Observer no longer interested in changes in set of Readers.
+   */
+  void deleteChangedReaderObserver(ChangedReadersObserver o) {
+    if (!this.changedReaderObservers.remove(o)) {
+      LOG.warn("Not in set" + o);
+    }
+  }
 
   //////////////////////////////////////////////////////////////////////////////
   // Compaction
@@ -724,12 +765,6 @@
           return checkSplit();
         }
 
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("started compaction of " + filesToCompact.size() +
-            " files " + filesToCompact.toString() + " into " +
-            compactionDir.toUri().getPath());
-        }
-
         // Storefiles are keyed by sequence id. The oldest file comes first.
         // We need to return out of here a List that has the newest file first.
         Collections.reverse(filesToCompact);
@@ -737,15 +772,20 @@
         // The max-sequenceID in any of the to-be-compacted TreeMaps is the 
         // last key of storefiles.
 
-        maxId = this.storefiles.lastKey();
+        maxId = this.storefiles.lastKey().longValue();
       }
 
       // Step through them, writing to the brand-new MapFile
       HStoreFile compactedOutputFile = new HStoreFile(conf, fs, 
           this.compactionDir, info.getEncodedName(), family.getFamilyName(),
           -1L, null);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("started compaction of " + filesToCompact.size() +
+          " files " + filesToCompact.toString() + " into " +
+          FSUtils.getPath(compactedOutputFile.getMapFilePath()));
+      }
       MapFile.Writer compactedOut = compactedOutputFile.getWriter(this.fs,
-          this.compression, this.bloomFilter);
+        this.compression, this.bloomFilter);
       try {
         compactHStoreFiles(compactedOut, filesToCompact);
       } finally {
@@ -955,112 +995,92 @@
    * 
    * <p>Moving the compacted TreeMap into place means:
    * <pre>
-   * 1) Wait for active scanners to exit
-   * 2) Acquiring the write-lock
-   * 3) Moving the new compacted MapFile into place
-   * 4) Unloading all the replaced MapFiles and close.
-   * 5) Deleting all the replaced MapFile files.
-   * 6) Loading the new TreeMap.
-   * 7) Compute new store size
-   * 8) Releasing the write-lock
-   * 9) Allow new scanners to proceed.
+   * 1) Moving the new compacted MapFile into place
+   * 2) Unload all replaced MapFiles, close and collect list to delete.
+   * 3) Loading the new TreeMap.
+   * 4) Compute new store size
    * </pre>
    * 
    * @param compactedFiles list of files that were compacted
    * @param compactedFile HStoreFile that is the result of the compaction
    * @throws IOException
    */
-  private void completeCompaction(List<HStoreFile> compactedFiles,
-      HStoreFile compactedFile) throws IOException {
-    
-    // 1. Wait for active scanners to exit
-    
-    newScannerLock.writeLock().lock();                  // prevent new scanners
+  private void completeCompaction(final List<HStoreFile> compactedFiles,
+    final HStoreFile compactedFile)
+  throws IOException {
+    this.lock.writeLock().lock();
     try {
-      synchronized (activeScanners) {
-        while (activeScanners.get() != 0) {
-          try {
-            activeScanners.wait();
-          } catch (InterruptedException e) {
-            // continue
-          }
-        }
-
-        // 2. Acquiring the HStore write-lock
-        this.lock.writeLock().lock();
+      // 1. Moving the new MapFile into place.
+      HStoreFile finalCompactedFile = new HStoreFile(conf, fs, basedir,
+        info.getEncodedName(), family.getFamilyName(), -1, null);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("moving " + FSUtils.getPath(compactedFile.getMapFilePath()) +
+          " to " + FSUtils.getPath(finalCompactedFile.getMapFilePath()));
+      }
+      if (!compactedFile.rename(this.fs, finalCompactedFile)) {
+        LOG.error("Failed move of compacted file " +
+          finalCompactedFile.getMapFilePath().toString());
+        return;
       }
 
-      try {
-        // 3. Moving the new MapFile into place.
-        
-        HStoreFile finalCompactedFile = new HStoreFile(conf, fs, basedir,
-            info.getEncodedName(), family.getFamilyName(), -1, null);
-        if(LOG.isDebugEnabled()) {
-          LOG.debug("moving " +
-            FSUtils.getPath(compactedFile.getMapFilePath()) +
-            " to " + FSUtils.getPath(finalCompactedFile.getMapFilePath()));
-        }
-        if (!compactedFile.rename(this.fs, finalCompactedFile)) {
-          LOG.error("Failed move of compacted file " +
-            finalCompactedFile.getMapFilePath().toString());
-          return;
-        }
-
-        // 4. and 5. Unload all the replaced MapFiles, close and delete.
-        
-        synchronized (storefiles) {
-          List<Long> toDelete = new ArrayList<Long>();
-          for (Map.Entry<Long, HStoreFile> e: this.storefiles.entrySet()) {
-            if (!compactedFiles.contains(e.getValue())) {
-              continue;
-            }
-            Long key = e.getKey();
-            MapFile.Reader reader = this.readers.remove(key);
-            if (reader != null) {
-              reader.close();
-            }
-            toDelete.add(key);
+      // 2. Unload all replaced MapFiles, close and collect list to delete.
+      synchronized (storefiles) {
+        Map<Long, HStoreFile> toDelete = new HashMap<Long, HStoreFile>();
+        for (Map.Entry<Long, HStoreFile> e : this.storefiles.entrySet()) {
+          if (!compactedFiles.contains(e.getValue())) {
+            continue;
           }
+          Long key = e.getKey();
+          MapFile.Reader reader = this.readers.remove(key);
+          if (reader != null) {
+            reader.close();
+          }
+          toDelete.put(key, e.getValue());
+        }
 
-          try {
-            for (Long key: toDelete) {
-              HStoreFile hsf = this.storefiles.remove(key);
-              hsf.delete();
-            }
-
-            // 6. Loading the new TreeMap.
-            Long orderVal = Long.valueOf(finalCompactedFile.loadInfo(fs));
-            this.readers.put(orderVal,
-                // Use a block cache (if configured) for this reader since
-                // it is the only one.
-                finalCompactedFile.getReader(this.fs, this.bloomFilter,
-                    family.isBlockCacheEnabled()));
-            this.storefiles.put(orderVal, finalCompactedFile);
-          } catch (IOException e) {
-            e = RemoteExceptionHandler.checkIOException(e);
-            LOG.error("Failed replacing compacted files for " + this.storeName +
-                ". Compacted file is " + finalCompactedFile.toString() +
-                ".  Files replaced are " + compactedFiles.toString() +
-                " some of which may have been already removed", e);
+        try {
+          // 3. Loading the new TreeMap.
+          // Change this.storefiles so it reflects new state but do not
+          // delete old store files until we have sent out notification of
+          // change in case old files are still being accessed by outstanding
+          // scanners.
+          for (Long key : toDelete.keySet()) {
+            this.storefiles.remove(key);
           }
-          // 7. Compute new store size
-          storeSize = 0L;
-          for (HStoreFile hsf: storefiles.values()) {
-            storeSize += hsf.length();
+          // Add new compacted Reader and store file.
+          Long orderVal = Long.valueOf(finalCompactedFile.loadInfo(fs));
+          this.readers.put(orderVal,
+          // Use a block cache (if configured) for this reader since
+              // it is the only one.
+              finalCompactedFile.getReader(this.fs, this.bloomFilter, family
+                  .isBlockCacheEnabled()));
+          this.storefiles.put(orderVal, finalCompactedFile);
+          // Tell observers that list of Readers has changed.
+          notifyChangedReadersObservers();
+          // Finally, delete old store files.
+          for (HStoreFile hsf : toDelete.values()) {
+            hsf.delete();
           }
+        } catch (IOException e) {
+          e = RemoteExceptionHandler.checkIOException(e);
+          LOG.error("Failed replacing compacted files for " + this.storeName +
+            ". Compacted file is " + finalCompactedFile.toString() +
+            ".  Files replaced are " + compactedFiles.toString() +
+            " some of which may have been already removed", e);
+        }
+        // 4. Compute new store size
+        storeSize = 0L;
+        for (HStoreFile hsf : storefiles.values()) {
+          storeSize += hsf.length();
         }
-      } finally {
-        // 8. Releasing the write-lock
-        this.lock.writeLock().unlock();
       }
     } finally {
-      // 9. Allow new scanners to proceed.
-      newScannerLock.writeLock().unlock();
+      this.lock.writeLock().unlock();
     }
   }
 
-  //////////////////////////////////////////////////////////////////////////////
-  // Accessors.  
+  // ////////////////////////////////////////////////////////////////////////////
+  // Accessors.
   // (This is the only section that is directly useful!)
   //////////////////////////////////////////////////////////////////////////////
   
@@ -1635,20 +1655,13 @@
    * Return a scanner for both the memcache and the HStore files
    */
   InternalScanner getScanner(long timestamp, Text targetCols[],
-      Text firstRow, RowFilterInterface filter) throws IOException {
-
-    newScannerLock.readLock().lock();           // ability to create a new
-                                                // scanner during a compaction
+      Text firstRow, RowFilterInterface filter)
+  throws IOException {
+    lock.readLock().lock();
     try {
-      lock.readLock().lock();                   // lock HStore
-      try {
-        return new HStoreScanner(this, targetCols, firstRow, timestamp, filter);
-
-      } finally {
-        lock.readLock().unlock();
-      }
+      return new HStoreScanner(this, targetCols, firstRow, timestamp, filter);
     } finally {
-      newScannerLock.readLock().unlock();
+      lock.readLock().unlock();
     }
   }
 
@@ -1689,20 +1702,15 @@
     }
     return m.groupCount() > 1 && m.group(2) != null;
   }
-  
-  protected void updateActiveScanners() {
-    synchronized (activeScanners) {
-      int numberOfScanners = activeScanners.decrementAndGet();
-      if (numberOfScanners < 0) {
-        LOG.error(storeName +
-            " number of active scanners less than zero: " +
-            numberOfScanners + " resetting to zero");
-        activeScanners.set(0);
-        numberOfScanners = 0;
-      }
-      if (numberOfScanners == 0) {
-        activeScanners.notifyAll();
-      }
+
+  /**
+   * @return Current list of store files.
+   */
+  SortedMap<Long, HStoreFile> getStorefiles() {
+    synchronized (this.storefiles) {
+      SortedMap<Long, HStoreFile> copy =
+        new TreeMap<Long, HStoreFile>(this.storefiles);
+      return copy;
     }
   }
-}
+}
\ No newline at end of file

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStoreScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStoreScanner.java?rev=651017&r1=651016&r2=651017&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStoreScanner.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStoreScanner.java Wed Apr 23 12:21:26 2008
@@ -66,7 +66,6 @@
     try {
       scanners[0] = store.memcache.getScanner(timestamp, targetCols, firstRow);
       scanners[1] = new StoreFileScanner(store, timestamp, targetCols, firstRow);
-      
       for (int i = 0; i < scanners.length; i++) {
         if (scanners[i].isWildcardScanner()) {
           this.wildcardMatch = true;
@@ -75,7 +74,6 @@
           this.multipleMatchers = true;
         }
       }
-
     } catch(IOException e) {
       for (int i = 0; i < this.scanners.length; i++) {
         if(scanners[i] != null) {
@@ -87,7 +85,6 @@
     
     // Advance to the first key in each scanner.
     // All results will match the required column-set and scanTime.
-    
     for (int i = 0; i < scanners.length; i++) {
       keys[i] = new HStoreKey();
       resultSets[i] = new TreeMap<Text, byte []>();
@@ -95,9 +92,6 @@
         closeScanner(i);
       }
     }
-    // As we have now successfully completed initialization, increment the
-    // activeScanner count.
-    store.activeScanners.incrementAndGet();
   }
 
   /** @return true if the scanner is a wild card scanner */
@@ -265,18 +259,13 @@
 
   /** {@inheritDoc} */
   public void close() {
-    try {
     for(int i = 0; i < scanners.length; i++) {
       if(scanners[i] != null) {
         closeScanner(i);
       }
     }
-    } finally {
-      store.updateActiveScanners();
-    }
   }
 
-  /** {@inheritDoc} */
   public Iterator<Map.Entry<HStoreKey, SortedMap<Text, byte[]>>> iterator() {
     throw new UnsupportedOperationException("Unimplemented serverside. " +
       "next(HStoreKey, StortedMap(...) is more efficient");

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java?rev=651017&r1=651016&r2=651017&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java Wed Apr 23 12:21:26 2008
@@ -22,56 +22,40 @@
 
 import java.io.IOException;
 import java.util.SortedMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import org.apache.hadoop.io.MapFile;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.hbase.HStoreKey;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.Text;
 
 /**
  * A scanner that iterates through HStore files
  */
-class StoreFileScanner extends HAbstractScanner {
+class StoreFileScanner extends HAbstractScanner
+implements ChangedReadersObserver {
     // Keys retrieved from the sources
   private HStoreKey keys[];
   // Values that correspond to those keys
   private byte [][] vals;
   
+  // Readers we go against.
   private MapFile.Reader[] readers;
-  private HStore store;
+  
+  // Store this scanner came out of.
+  private final HStore store;
+  
+  // Used around replacement of Readers if they change while we're scanning.
+  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
   
   public StoreFileScanner(final HStore store, final long timestamp,
     final Text[] targetCols, final Text firstRow)
   throws IOException {
     super(timestamp, targetCols);
     this.store = store;
+    this.store.addChangedReaderObserver(this);
     try {
-      this.readers = new MapFile.Reader[store.storefiles.size()];
-      
-      // Most recent map file should be first
-      int i = readers.length - 1;
-      for(HStoreFile curHSF: store.storefiles.values()) {
-        readers[i--] = curHSF.getReader(store.fs, store.bloomFilter);
-      }
-      
-      this.keys = new HStoreKey[readers.length];
-      this.vals = new byte[readers.length][];
-      
-      // Advance the readers to the first pos.
-      for(i = 0; i < readers.length; i++) {
-        keys[i] = new HStoreKey();
-        if(firstRow.getLength() != 0) {
-          if(findFirstRow(i, firstRow)) {
-            continue;
-          }
-        }
-        while(getNext(i)) {
-          if(columnMatch(i)) {
-            break;
-          }
-        }
-      }
-      
+      openReaders(firstRow);
     } catch (Exception ex) {
       close();
       IOException e = new IOException("HStoreScanner failed construction");
@@ -80,6 +64,46 @@
     }
   }
   
+  /*
+   * Go open new Reader iterators and cue them at <code>firstRow</code>.
+   * Closes existing Readers if any.
+   * @param firstRow
+   * @throws IOException
+   */
+  private void openReaders(final Text firstRow) throws IOException {
+    if (this.readers != null) {
+      for (int i = 0; i < this.readers.length; i++) {
+        this.readers[i].close();
+      }
+    }
+    // Open our own copies of the Readers here inside in the scanner.
+    this.readers = new MapFile.Reader[this.store.getStorefiles().size()];
+    
+    // Most recent map file should be first
+    int i = readers.length - 1;
+    for(HStoreFile curHSF: store.getStorefiles().values()) {
+      readers[i--] = curHSF.getReader(store.fs, store.bloomFilter);
+    }
+    
+    this.keys = new HStoreKey[readers.length];
+    this.vals = new byte[readers.length][];
+    
+    // Advance the readers to the first pos.
+    for (i = 0; i < readers.length; i++) {
+      keys[i] = new HStoreKey();
+      if (firstRow.getLength() != 0) {
+        if (findFirstRow(i, firstRow)) {
+          continue;
+        }
+      }
+      while (getNext(i)) {
+        if (columnMatch(i)) {
+          break;
+        }
+      }
+    }
+  }
+
   /**
    * For a particular column i, find all the matchers defined for the column.
    * Compare the column family and column key using the matchers. The first one
@@ -107,72 +131,104 @@
   @Override
   public boolean next(HStoreKey key, SortedMap<Text, byte []> results)
   throws IOException {
-    if (scannerClosed) {
+    if (this.scannerClosed) {
       return false;
     }
-    // Find the next row label (and timestamp)
-    Text chosenRow = null;
-    long chosenTimestamp = -1;
-    for(int i = 0; i < keys.length; i++) {
-      if((keys[i] != null)
-          && (columnMatch(i))
-          && (keys[i].getTimestamp() <= this.timestamp)
-          && ((chosenRow == null)
-              || (keys[i].getRow().compareTo(chosenRow) < 0)
-              || ((keys[i].getRow().compareTo(chosenRow) == 0)
-                  && (keys[i].getTimestamp() > chosenTimestamp)))) {
-        chosenRow = new Text(keys[i].getRow());
-        chosenTimestamp = keys[i].getTimestamp();
-      }
-    }
+    this.lock.readLock().lock();
+    try {
+      // Find the next viable row label (and timestamp).
+      ViableRow viableRow = getNextViableRow();
+      
+      // Grab all the values that match this row/timestamp
+      boolean insertedItem = false;
+      if (viableRow.getRow() != null) {
+        key.setRow(viableRow.getRow());
+        key.setVersion(viableRow.getTimestamp());
+        key.setColumn(new Text(""));
+
+        for (int i = 0; i < keys.length; i++) {
+          // Fetch the data
+          while ((keys[i] != null)
+              && (keys[i].getRow().compareTo(viableRow.getRow()) == 0)) {
+
+            // If we are doing a wild card match or there are multiple matchers
+            // per column, we need to scan all the older versions of this row
+            // to pick up the rest of the family members
+            if(!isWildcardScanner()
+                && !isMultipleMatchScanner()
+                && (keys[i].getTimestamp() != viableRow.getTimestamp())) {
+              break;
+            }
 
-    // Grab all the values that match this row/timestamp
-    boolean insertedItem = false;
-    if(chosenRow != null) {
-      key.setRow(chosenRow);
-      key.setVersion(chosenTimestamp);
-      key.setColumn(new Text(""));
-
-      for(int i = 0; i < keys.length; i++) {
-        // Fetch the data
-        while((keys[i] != null)
-            && (keys[i].getRow().compareTo(chosenRow) == 0)) {
-
-          // If we are doing a wild card match or there are multiple matchers
-          // per column, we need to scan all the older versions of this row
-          // to pick up the rest of the family members
-          
-          if(!isWildcardScanner()
-              && !isMultipleMatchScanner()
-              && (keys[i].getTimestamp() != chosenTimestamp)) {
-            break;
-          }
+            if(columnMatch(i)) {              
+              // We only want the first result for any specific family member
+              if(!results.containsKey(keys[i].getColumn())) {
+                results.put(new Text(keys[i].getColumn()), vals[i]);
+                insertedItem = true;
+              }
+            }
 
-          if(columnMatch(i)) {              
-            // We only want the first result for any specific family member
-            if(!results.containsKey(keys[i].getColumn())) {
-              results.put(new Text(keys[i].getColumn()), vals[i]);
-              insertedItem = true;
+            if (!getNext(i)) {
+              closeSubScanner(i);
             }
           }
 
-          if(!getNext(i)) {
-            closeSubScanner(i);
+          // Advance the current scanner beyond the chosen row, to
+          // a valid timestamp, so we're ready next time.
+          while ((keys[i] != null)
+              && ((keys[i].getRow().compareTo(viableRow.getRow()) <= 0)
+                  || (keys[i].getTimestamp() > this.timestamp)
+                  || (! columnMatch(i)))) {
+            getNext(i);
           }
         }
+      }
+      return insertedItem;
+    } finally {
+      this.lock.readLock().unlock();
+    }
+  }
+  
+  // Data stucture to hold next, viable row (and timestamp).
+  class ViableRow {
+    private final Text row;
+    private final long ts;
+
+    ViableRow(final Text r, final long t) {
+      this.row = r;
+      this.ts = t;
+    }
 
-        // Advance the current scanner beyond the chosen row, to
-        // a valid timestamp, so we're ready next time.
-        
-        while((keys[i] != null)
-            && ((keys[i].getRow().compareTo(chosenRow) <= 0)
-                || (keys[i].getTimestamp() > this.timestamp)
-                || (! columnMatch(i)))) {
-          getNext(i);
-        }
+    public Text getRow() {
+      return this.row;
+    }
+
+    public long getTimestamp() {
+      return this.ts;
+    }
+  }
+  
+  /*
+   * @return An instance of <code>ViableRow</code>
+   * @throws IOException
+   */
+  private ViableRow getNextViableRow() throws IOException {
+    // Find the next viable row label (and timestamp).
+    Text viableRow = null;
+    long viableTimestamp = -1;
+    for(int i = 0; i < keys.length; i++) {
+      if((keys[i] != null)
+          && (columnMatch(i))
+          && (keys[i].getTimestamp() <= this.timestamp)
+          && ((viableRow == null)
+              || (keys[i].getRow().compareTo(viableRow) < 0)
+              || ((keys[i].getRow().compareTo(viableRow) == 0)
+                  && (keys[i].getTimestamp() > viableTimestamp)))) {
+        viableRow = new Text(keys[i].getRow());
+        viableTimestamp = keys[i].getTimestamp();
       }
     }
-    return insertedItem;
+    return new ViableRow(viableRow, viableTimestamp);
   }
 
   /**
@@ -242,7 +298,8 @@
 
   /** Shut it down! */
   public void close() {
-    if(! scannerClosed) {
+    if (!this.scannerClosed) {
+      this.store.deleteChangedReaderObserver(this);
       try {
         for(int i = 0; i < readers.length; i++) {
           if(readers[i] != null) {
@@ -255,8 +312,23 @@
         }
         
       } finally {
-        scannerClosed = true;
+        this.scannerClosed = true;
       }
+    }
+  }
+
+  // Implementation of ChangedReadersObserver
+  public void updateReaders() throws IOException {
+    this.lock.writeLock().lock();
+    try {
+      // The keys are currently lined up at the next row to fetch.  Pass in
+      // the current row as 'first' row and readers will be opened and cue'd
+      // up so future call to next will start here.
+      ViableRow viableRow = getNextViableRow();
+      openReaders(viableRow.getRow());
+      LOG.debug("Replaced Scanner Readers at row " + viableRow.getRow());
+    } finally {
+      this.lock.writeLock().unlock();
     }
   }
 }

Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/MultiRegionTable.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/MultiRegionTable.java?rev=651017&r1=651016&r2=651017&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/MultiRegionTable.java (original)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/MultiRegionTable.java Wed Apr 23 12:21:26 2008
@@ -127,7 +127,7 @@
     assertNotNull(r);
 
     // Flush the cache
-    server.getCacheFlushListener().flushRequested(r);
+    server.getFlushRequester().request(r);
 
     // Now, wait until split makes it into the meta table.
     int oldCount = count;



Mime
View raw message