hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From raw...@apache.org
Subject svn commit: r944527 - in /hadoop/hbase/trunk/core/src: main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/regionserver/ test/java/org/apache/hadoop/hbase/regionserver/
Date Sat, 15 May 2010 00:16:41 GMT
Author: rawson
Date: Sat May 15 00:16:40 2010
New Revision: 944527

URL: http://svn.apache.org/viewvc?rev=944527&view=rev
Log:
The core elements of HBASE-2037: refactoring flushing, and adding configurability in which
HRegion subclass is instantiated


Added:
    hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
Modified:
    hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/HConstants.java
    hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/HMerge.java
    hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
    hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/ScanDeleteTracker.java
    hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
    hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java

Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=944527&r1=944526&r2=944527&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/HConstants.java Sat May
15 00:16:40 2010
@@ -1,5 +1,5 @@
 /**
- * Copyright 2007 The Apache Software Foundation
+ * Copyright 2010 The Apache Software Foundation
  *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -286,6 +286,8 @@ public interface HConstants {
    */
   public static int RETRY_BACKOFF[] = { 1, 1, 1, 2, 2, 4, 4, 8, 16, 32 };
 
+  public static final String REGION_IMPL = "hbase.hregion.impl";
+
   /** modifyTable op for replacing the table descriptor */
   public static enum Modify {
     CLOSE_REGION,

Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/HMerge.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/HMerge.java?rev=944527&r1=944526&r2=944527&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/HMerge.java (original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/HMerge.java Sat May 15 00:16:40
2010
@@ -19,12 +19,6 @@
  */
 package org.apache.hadoop.hbase;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.NoSuchElementException;
-import java.util.Random;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -44,6 +38,12 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Writables;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Random;
+
 /**
  * A non-instantiable class that has a static method capable of compacting
  * a table by merging adjacent regions.
@@ -152,12 +152,12 @@ class HMerge implements HConstants {
       for (int i = 0; i < info.length - 1; i++) {
         if (currentRegion == null) {
           currentRegion =
-            new HRegion(tabledir, hlog, fs, conf, info[i], null);
+            HRegion.newHRegion(tabledir, hlog, fs, conf, info[i], null);
           currentRegion.initialize(null, null);
           currentSize = currentRegion.getLargestHStoreSize();
         }
         nextRegion =
-          new HRegion(tabledir, hlog, fs, conf, info[i + 1], null);
+          HRegion.newHRegion(tabledir, hlog, fs, conf, info[i + 1], null);
         nextRegion.initialize(null, null);
         nextSize = nextRegion.getLargestHStoreSize();
 
@@ -326,7 +326,7 @@ class HMerge implements HConstants {
 
       // Scan root region to find all the meta regions
 
-      root = new HRegion(rootTableDir, hlog, fs, conf,
+      root = HRegion.newHRegion(rootTableDir, hlog, fs, conf,
           HRegionInfo.ROOT_REGIONINFO, null);
       root.initialize(null, null);
 

Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=944527&r1=944526&r2=944527&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
(original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
Sat May 15 00:16:40 2010
@@ -19,22 +19,6 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
- import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableSet;
-import java.util.Random;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -70,6 +54,23 @@ import org.apache.hadoop.hbase.util.Writ
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.StringUtils;
 
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 /**
  * HRegion stores data for a certain region of a table.  It stores all columns
  * for each row. A given table consists of one or more HRegions.
@@ -238,7 +239,10 @@ public class HRegion implements HConstan
   }
 
   /**
-   * HRegion constructor.
+   * HRegion constructor.  his constructor should only be used for testing and
+   * extensions.  Instances of HRegion should be instantiated with the
+   * {@link org.apache.hadoop.hbase.regionserver.HRegion#newHRegion( org.apache.hadoop.fs.Path,
HLog, org.apache.hadoop.fs.FileSystem, org.apache.hadoop.hbase.HBaseConfiguration, org.apache.hadoop.hbase.HRegionInfo,
FlushRequester)} method.
+   *
    *
    * @param basedir qualified path of directory where region should be located,
    * usually the table directory.
@@ -256,6 +260,9 @@ public class HRegion implements HConstan
    * @param flushListener an object that implements CacheFlushListener or null
    * making progress to master -- otherwise master might think region deploy
    * failed.  Can be null.
+   *
+   * @see org.apache.hadoop.hbase.regionserver.HRegion#newHRegion(org.apache.hadoop.fs.Path,
HLog, org.apache.hadoop.fs.FileSystem, org.apache.hadoop.hbase.HBaseConfiguration, org.apache.hadoop.hbase.HRegionInfo,
FlushRequester)
+
    */
   public HRegion(Path basedir, HLog log, FileSystem fs, Configuration conf,
       HRegionInfo regionInfo, FlushRequester flushListener) {
@@ -686,10 +693,10 @@ public class HRegion implements HConstan
       // Create a region instance and then move the splits into place under
       // regionA and regionB.
       HRegion regionA =
-        new HRegion(basedir, log, fs, conf, regionAInfo, null);
+        HRegion.newHRegion(basedir, log, fs, conf, regionAInfo, null);
       moveInitialFilesIntoPlace(this.fs, dirA, regionA.getRegionDir());
       HRegion regionB =
-        new HRegion(basedir, log, fs, conf, regionBInfo, null);
+        HRegion.newHRegion(basedir, log, fs, conf, regionBInfo, null);
       moveInitialFilesIntoPlace(this.fs, dirB, regionB.getRegionDir());
 
       HRegion regions[] = new HRegion [] {regionA, regionB};
@@ -927,7 +934,7 @@ public class HRegion implements HConstan
    * @throws DroppedSnapshotException Thrown when replay of hlog is required
    * because a Snapshot was not properly persisted.
    */
-  private boolean internalFlushcache() throws IOException {
+  protected boolean internalFlushcache() throws IOException {
     final long startTime = System.currentTimeMillis();
     // Clear flush flag.
     // Record latest flush time
@@ -954,12 +961,19 @@ public class HRegion implements HConstan
     this.updatesLock.writeLock().lock();
     // Get current size of memstores.
     final long currentMemStoreSize = this.memstoreSize.get();
+    List<StoreFlusher> storeFlushers = new ArrayList<StoreFlusher>();
     try {
-      for (Store s: stores.values()) {
-        s.snapshot();
-      }
       sequenceId = log.startCacheFlush();
       completeSequenceId = this.getCompleteCacheFlushSequenceId(sequenceId);
+
+      for (Store s : stores.values()) {
+        storeFlushers.add(s.getStoreFlusher(completeSequenceId));
+      }
+
+      // prepare flush (take a snapshot)
+      for (StoreFlusher flusher : storeFlushers) {
+        flusher.prepare();
+      }
     } finally {
       this.updatesLock.writeLock().unlock();
     }
@@ -973,12 +987,25 @@ public class HRegion implements HConstan
       // A.  Flush memstore to all the HStores.
       // Keep running vector of all store files that includes both old and the
       // just-made new flush store file.
-      for (Store hstore: stores.values()) {
-        boolean needsCompaction = hstore.flushCache(completeSequenceId);
+
+      for (StoreFlusher flusher : storeFlushers) {
+        flusher.flushCache();
+      }
+
+      internalPreFlushcacheCommit();
+
+      /*
+       * Switch between memstore and the new store file(s).
+       */
+      for (StoreFlusher flusher : storeFlushers) {
+        boolean needsCompaction = flusher.commit();
         if (needsCompaction) {
           compactionRequested = true;
         }
       }
+
+      storeFlushers.clear();
+      
       // Set down the memstore size by amount of flush.
       this.memstoreSize.addAndGet(-currentMemStoreSize);
     } catch (Throwable t) {
@@ -1023,6 +1050,15 @@ public class HRegion implements HConstan
   }
 
   /**
+   * A hook for sub-classes wishing to perform operations prior to the
+   * cache flush commit stage.
+   *
+   * @throws IOException allow children to throw exception
+   */
+  protected void internalPreFlushcacheCommit() throws IOException {
+  }
+  
+  /**
    * Get the sequence number to be associated with this cache flush. Used by
    * TransactionalRegion to not complete pending transactions.
    *
@@ -1117,13 +1153,18 @@ public class HRegion implements HConstan
           scan.addFamily(family);
         }
       }
-      return new RegionScanner(scan, additionalScanners);
+      return instantiateInternalScanner(scan, additionalScanners);
 
     } finally {
       newScannerLock.readLock().unlock();
     }
   }
 
+  protected InternalScanner instantiateInternalScanner(Scan scan,
+                                                       List<KeyValueScanner> additionalScanners)
throws IOException {
+    return new RegionScanner(scan, additionalScanners);
+  }
+
   //////////////////////////////////////////////////////////////////////////////
   // set() methods for client use.
   //////////////////////////////////////////////////////////////////////////////
@@ -1976,6 +2017,45 @@ public class HRegion implements HConstan
   }
 
   // Utility methods
+  /**
+   * A utility method to create new instances of HRegion based on the
+   * {@link org.apache.hadoop.hbase.HConstants#REGION_IMPL} configuration
+   * property.
+   * @param basedir qualified path of directory where region should be located,
+   * usually the table directory.
+   * @param log The HLog is the outbound log for any updates to the HRegion
+   * (There's a single HLog for all the HRegions on a single HRegionServer.)
+   * The log file is a logfile from the previous execution that's
+   * custom-computed for this HRegion. The HRegionServer computes and sorts the
+   * appropriate log info for this HRegion. If there is a previous log file
+   * (implying that the HRegion has been written-to before), then read it from
+   * the supplied path.
+   * @param fs is the filesystem.
+   * @param conf is global configuration settings.
+   * @param regionInfo - HRegionInfo that describes the region
+   * is new), then read them from the supplied path.
+   * @param flushListener an object that implements CacheFlushListener or null
+   * making progress to master -- otherwise master might think region deploy
+   * failed.  Can be null.
+   * @return the new instance
+   */
+  public static HRegion newHRegion(Path basedir, HLog log, FileSystem fs, Configuration conf,
+                                   HRegionInfo regionInfo, FlushRequester flushListener)
{
+    try {
+      @SuppressWarnings("unchecked")
+      Class<? extends HRegion> regionClass =
+          (Class<? extends HRegion>) conf.getClass(HConstants.REGION_IMPL, HRegion.class);
+
+      Constructor<? extends HRegion> c =
+          regionClass.getConstructor(Path.class, HLog.class, FileSystem.class,
+              Configuration.class, HRegionInfo.class, FlushRequester.class);
+
+      return c.newInstance(basedir, log, fs, conf, regionInfo, flushListener);
+    } catch (Throwable e) {
+      // todo: what should I throw here?
+      throw new IllegalStateException("Could not instantiate a region instance.", e);
+    }
+  }
 
   /**
    * Convenience method creating new HRegions. Used by createTable and by the
@@ -1998,7 +2078,7 @@ public class HRegion implements HConstan
     Path regionDir = HRegion.getRegionDir(tableDir, info.getEncodedName());
     FileSystem fs = FileSystem.get(conf);
     fs.mkdirs(regionDir);
-    HRegion region = new HRegion(tableDir,
+    HRegion region = HRegion.newHRegion(tableDir,
       new HLog(fs, new Path(regionDir, HREGION_LOGDIR_NAME),
           new Path(regionDir, HREGION_OLDLOGDIR_NAME), conf, null),
       fs, conf, info, null);
@@ -2028,7 +2108,7 @@ public class HRegion implements HConstan
     if (info == null) {
       throw new NullPointerException("Passed region info is null");
     }
-    HRegion r = new HRegion(
+    HRegion r = HRegion.newHRegion(
         HTableDescriptor.getTableDir(rootDir, info.getTableDesc().getName()),
         log, FileSystem.get(conf), conf, info, null);
     r.initialize(null, null);
@@ -2335,7 +2415,7 @@ public class HRegion implements HConstan
       LOG.debug("Files for new region");
       listPaths(fs, newRegionDir);
     }
-    HRegion dstRegion = new HRegion(basedir, log, fs, conf, newRegionInfo, null);
+    HRegion dstRegion = HRegion.newHRegion(basedir, log, fs, conf, newRegionInfo, null);
     dstRegion.initialize(null, null);
     dstRegion.compactStores();
     if (LOG.isDebugEnabled()) {
@@ -2592,9 +2672,9 @@ public class HRegion implements HConstan
     String metaStr = Bytes.toString(HConstants.META_TABLE_NAME);
     // Currently expects tables have one region only.
     if (p.getName().startsWith(rootStr)) {
-      region = new HRegion(p, log, fs, c, HRegionInfo.ROOT_REGIONINFO, null);
+      region = HRegion.newHRegion(p, log, fs, c, HRegionInfo.ROOT_REGIONINFO, null);
     } else if (p.getName().startsWith(metaStr)) {
-      region = new HRegion(p, log, fs, c, HRegionInfo.FIRST_META_REGIONINFO,
+      region = HRegion.newHRegion(p, log, fs, c, HRegionInfo.FIRST_META_REGIONINFO,
           null);
     } else {
       throw new IOException("Not a known catalog table: " + p.toString());

Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=944527&r1=944526&r2=944527&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
(original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
Sat May 15 00:16:40 2010
@@ -1520,7 +1520,7 @@ public class HRegionServer implements HC
 
   protected HRegion instantiateRegion(final HRegionInfo regionInfo)
       throws IOException {
-    HRegion r = new HRegion(HTableDescriptor.getTableDir(rootDir, regionInfo
+    HRegion r = HRegion.newHRegion(HTableDescriptor.getTableDir(rootDir, regionInfo
         .getTableDesc().getName()), this.hlog, this.fs, conf, regionInfo,
         this.cacheFlusher);
     r.initialize(null,  new Progressable() {

Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java?rev=944527&r1=944526&r2=944527&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
(original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
Sat May 15 00:16:40 2010
@@ -116,7 +116,7 @@ public class MemStore implements HeapSiz
 
   /**
    * Creates a snapshot of the current memstore.
-   * Snapshot must be cleared by call to {@link #clearSnapshot(java.util.Map)}
+   * Snapshot must be cleared by call to {@link #clearSnapshot(SortedSet<KeyValue>)}
    * To get the snapshot made by this method, use {@link #getSnapshot()}
    */
   void snapshot() {
@@ -156,7 +156,7 @@ public class MemStore implements HeapSiz
    * call to {@link #snapshot()}
    * @return Return snapshot.
    * @see {@link #snapshot()}
-   * @see {@link #clearSnapshot(java.util.Map)}
+   * @see {@link #clearSnapshot(SortedSet<KeyValue>)}
    */
   KeyValueSkipListSet getSnapshot() {
     return this.snapshot;
@@ -168,7 +168,7 @@ public class MemStore implements HeapSiz
    * @throws UnexpectedException
    * @see {@link #snapshot()}
    */
-  void clearSnapshot(final KeyValueSkipListSet ss)
+  void clearSnapshot(final SortedSet<KeyValue> ss)
   throws UnexpectedException {
     this.lock.writeLock().lock();
     try {

Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/ScanDeleteTracker.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/ScanDeleteTracker.java?rev=944527&r1=944526&r2=944527&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/ScanDeleteTracker.java
(original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/ScanDeleteTracker.java
Sat May 15 00:16:40 2010
@@ -104,7 +104,7 @@ public class ScanDeleteTracker implement
   @Override
   public boolean isDeleted(byte [] buffer, int qualifierOffset,
       int qualifierLength, long timestamp) {
-    if (timestamp < familyStamp) {
+    if (timestamp <= familyStamp) {
       return true;
     }
 

Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=944527&r1=944526&r2=944527&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
(original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
Sat May 15 00:16:40 2010
@@ -58,6 +58,7 @@ import java.util.Map;
 import java.util.NavigableMap;
 import java.util.NavigableSet;
 import java.util.Set;
+import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CopyOnWriteArraySet;
@@ -380,8 +381,11 @@ public class Store implements HConstants
 
     if (maxSeqIdInLog > -1) {
       // We read some edits, so we should flush the memstore
-      this.snapshot();
-      boolean needCompaction = this.flushCache(maxSeqIdInLog);
+      StoreFlusher flusher = getStoreFlusher(maxSeqIdInLog);
+      flusher.prepare();
+      flusher.flushCache();
+      boolean needCompaction = flusher.commit();
+
       if (needCompaction) {
         this.compact(false);
       }
@@ -499,7 +503,7 @@ public class Store implements HConstants
 
   /**
    * Snapshot this stores memstore.  Call before running
-   * {@link #flushCache(long)} so it has some work to do.
+   * {@link #flushCache(long, SortedSet<KeyValue>)} so it has some work to do.
    */
   void snapshot() {
     this.memstore.snapshot();
@@ -512,21 +516,13 @@ public class Store implements HConstants
    * @return true if a compaction is needed
    * @throws IOException
    */
-  boolean flushCache(final long logCacheFlushId) throws IOException {
-    // Get the snapshot to flush.  Presumes that a call to
-    // this.memstore.snapshot() has happened earlier up in the chain.
-    KeyValueSkipListSet snapshot = this.memstore.getSnapshot();
+  private StoreFile flushCache(final long logCacheFlushId,
+                               SortedSet<KeyValue> snapshot) throws IOException {
     // If an exception happens flushing, we let it out without clearing
     // the memstore snapshot.  The old snapshot will be returned when we say
     // 'snapshot', the next time flush comes around.
-    StoreFile sf = internalFlushCache(snapshot, logCacheFlushId);
-    if (sf == null) {
-      return false;
-    }
-    // Add new file to store files.  Clear snapshot too while we have the
-    // Store write lock.
-    int size = updateStorefiles(logCacheFlushId, sf, snapshot);
-    return size >= this.compactionThreshold;
+    return internalFlushCache(snapshot, logCacheFlushId);
+
   }
 
   /*
@@ -535,7 +531,7 @@ public class Store implements HConstants
    * @return StoreFile created.
    * @throws IOException
    */
-  private StoreFile internalFlushCache(final KeyValueSkipListSet set,
+  private StoreFile internalFlushCache(final SortedSet<KeyValue> set,
     final long logCacheFlushId)
   throws IOException {
     HFile.Writer writer = null;
@@ -605,20 +601,18 @@ public class Store implements HConstants
    * @param sf
    * @param set That was used to make the passed file <code>p</code>.
    * @throws IOException
-   * @return Count of store files.
+   * @return Whether compaction is required.
    */
-  private int updateStorefiles(final long logCacheFlushId,
-    final StoreFile sf, final KeyValueSkipListSet set)
+  private boolean updateStorefiles(final long logCacheFlushId,
+    final StoreFile sf, final SortedSet<KeyValue> set)
   throws IOException {
-    int count = 0;
     this.lock.writeLock().lock();
     try {
       this.storefiles.put(Long.valueOf(logCacheFlushId), sf);
-      count = this.storefiles.size();
       // Tell listeners of the change in readers.
       notifyChangedReadersObservers();
       this.memstore.clearSnapshot(set);
-      return count;
+      return this.storefiles.size() >= this.compactionThreshold;
     } finally {
       this.lock.writeLock().unlock();
     }
@@ -1513,6 +1507,42 @@ public class Store implements HConstants
     }
   }
 
+  public StoreFlusher getStoreFlusher(long cacheFlushId) {
+    return new StoreFlusherImpl(cacheFlushId);
+  }
+
+  private class StoreFlusherImpl implements StoreFlusher {
+
+    private long cacheFlushId;
+    private SortedSet<KeyValue> snapshot;
+    private StoreFile storeFile;
+
+    private StoreFlusherImpl(long cacheFlushId) {
+      this.cacheFlushId = cacheFlushId;
+    }
+
+    @Override
+    public void prepare() {
+      memstore.snapshot();
+      this.snapshot = memstore.getSnapshot();
+    }
+
+    @Override
+    public void flushCache() throws IOException {
+      storeFile = Store.this.flushCache(cacheFlushId, snapshot);
+    }
+
+    @Override
+    public boolean commit() throws IOException {
+      if (storeFile == null) {
+        return false;
+      }
+      // Add new file to store files.  Clear snapshot too while we have 
+      // the Store write lock.
+      return Store.this.updateStorefiles(cacheFlushId, storeFile, snapshot);
+    }
+  }
+
   /**
    * See if there's too much store files in this store
    * @return true if number of store files is greater than

Added: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java?rev=944527&view=auto
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
(added)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
Sat May 15 00:16:40 2010
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+
+/**
+ * A package protected interface for a store flushing.
+ * A store flusher carries the state required to prepare/flush/commit the
+ * store's cache.
+ */
+interface StoreFlusher {
+
+  /**
+   * Prepare for a store flush (create snapshot)
+   *
+   * Requires pausing writes.
+   *
+   * A very short operation.
+   */
+  void prepare();
+
+  /**
+   * Flush the cache (create the new store file)
+   *
+   * A length operation which doesn't require locking out any function
+   * of the store.
+   *
+   * @throws IOException in case the flush fails
+   */
+  void flushCache() throws IOException;
+
+  /**
+   * Commit the flush - add the store file to the store and clear the
+   * memstore snapshot.
+   *
+   * Requires pausing scans.
+   *
+   * A very short operation
+   *
+   * @return
+   * @throws IOException
+   */
+  boolean commit() throws IOException;
+
+}

Modified: hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java?rev=944527&r1=944526&r2=944527&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
(original)
+++ hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
Sat May 15 00:16:40 2010
@@ -1,5 +1,5 @@
 /**
- * Copyright 2007 The Apache Software Foundation
+ * Copyright 2010 The Apache Software Foundation
  *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -19,13 +19,6 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
@@ -41,13 +34,26 @@ import org.apache.hadoop.hbase.client.Ge
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
 import org.apache.hadoop.hbase.filter.ColumnCountGetFilter;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FilterList;
 import org.apache.hadoop.hbase.filter.PrefixFilter;
+import org.apache.hadoop.hbase.filter.RowFilter;
 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
-import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.regionserver.HRegion.RegionScanner;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Assert;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
 
 /**
  * Basic stand-alone testing of HRegion.
@@ -64,13 +70,13 @@ public class TestHRegion extends HBaseTe
   private final int MAX_VERSIONS = 2;
 
   // Test names
-  private final byte[] tableName = Bytes.toBytes("testtable");;
-  private final byte[] qual1 = Bytes.toBytes("qual1");
-  private final byte[] qual2 = Bytes.toBytes("qual2");
-  private final byte[] qual3 = Bytes.toBytes("qual3");
-  private final byte[] value1 = Bytes.toBytes("value1");
-  private final byte[] value2 = Bytes.toBytes("value2");
-  private final byte [] row = Bytes.toBytes("rowA");
+  protected final byte[] tableName = Bytes.toBytes("testtable");;
+  protected final byte[] qual1 = Bytes.toBytes("qual1");
+  protected final byte[] qual2 = Bytes.toBytes("qual2");
+  protected final byte[] qual3 = Bytes.toBytes("qual3");
+  protected final byte[] value1 = Bytes.toBytes("value1");
+  protected final byte[] value2 = Bytes.toBytes("value2");
+  protected final byte [] row = Bytes.toBytes("rowA");
 
   /**
    * @see org.apache.hadoop.hbase.HBaseTestCase#setUp()
@@ -1799,6 +1805,378 @@ public class TestHRegion extends HBaseTe
     }
   }
 
+
+  /**
+   * Flushes the cache in a thread while scanning. The tests verify that the
+   * scan is coherent - e.g. the returned results are always of the same or
+   * later update as the previous results.
+   * @throws IOException scan / compact
+   * @throws InterruptedException thread join
+   */
+  public void testFlushCacheWhileScanning() throws IOException, InterruptedException {
+    byte[] tableName = Bytes.toBytes("testFlushCacheWhileScanning");
+    byte[] family = Bytes.toBytes("family");
+    int numRows = 1000;
+    int flushAndScanInterval = 10;
+    int compactInterval = 10 * flushAndScanInterval;
+
+    String method = "testFlushCacheWhileScanning";
+    initHRegion(tableName,method, family);
+    FlushThread flushThread = new FlushThread();
+    flushThread.start();
+
+    Scan scan = new Scan();
+    scan.addFamily(family);
+    scan.setFilter(new SingleColumnValueFilter(family, qual1,
+      CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(5L))));
+
+    int expectedCount = 0;
+    List<KeyValue> res = new ArrayList<KeyValue>();
+
+    boolean toggle=true;
+    for (long i = 0; i < numRows; i++) {
+      Put put = new Put(Bytes.toBytes(i));
+      put.add(family, qual1, Bytes.toBytes(i % 10));
+      region.put(put);
+
+      if (i != 0 && i % compactInterval == 0) {
+        //System.out.println("iteration = " + i);
+        region.compactStores(true);
+      }
+
+      if (i % 10 == 5L) {
+        expectedCount++;
+      }
+
+      if (i != 0 && i % flushAndScanInterval == 0) {
+        res.clear();
+        InternalScanner scanner = region.getScanner(scan);
+        if (toggle) {
+          flushThread.flush();
+        }
+        while (scanner.next(res)) ;
+        if (!toggle) {
+          flushThread.flush();
+        }
+        Assert.assertEquals("i=" + i, expectedCount, res.size());
+        toggle = !toggle;
+      }
+    }
+
+    flushThread.done();
+    flushThread.join();
+    flushThread.checkNoError();
+  }
+
+  protected class FlushThread extends Thread {
+    private volatile boolean done;
+    private Throwable error = null;
+
+    public void done() {
+      done = true;
+      synchronized (this) {
+        interrupt();
+      }
+    }
+
+    public void checkNoError() {
+      if (error != null) {
+        Assert.assertNull(error);
+      }
+    }
+
+    @Override
+    public void run() {
+      done = false;
+      while (!done) {
+        synchronized (this) {
+          try {
+            wait();
+          } catch (InterruptedException ignored) {
+            if (done) {
+              break;
+            }
+          }
+        }
+        try {
+          region.flushcache();
+        } catch (IOException e) {
+          if (!done) {
+            LOG.error("Error while flusing cache", e);
+            error = e;
+          }
+          break;
+        }
+      }
+
+    }
+
+    public void flush() {
+      synchronized (this) {
+        notify();
+      }
+
+    }
+  }
+
+  /**
+   * Writes very wide records and scans for the latest every time..
+   * Flushes and compacts the region every now and then to keep things
+   * realistic.
+   *
+   * @throws IOException          by flush / scan / compaction
+   * @throws InterruptedException when joining threads
+   */
+  public void testWritesWhileScanning()
+    throws IOException, InterruptedException {
+    byte[] tableName = Bytes.toBytes("testWritesWhileScanning");
+    int testCount = 100;
+    int numRows = 1;
+    int numFamilies = 10;
+    int numQualifiers = 100;
+    int flushInterval = 7;
+    int compactInterval = 5 * flushInterval;
+    byte[][] families = new byte[numFamilies][];
+    for (int i = 0; i < numFamilies; i++) {
+      families[i] = Bytes.toBytes("family" + i);
+    }
+    byte[][] qualifiers = new byte[numQualifiers][];
+    for (int i = 0; i < numQualifiers; i++) {
+      qualifiers[i] = Bytes.toBytes("qual" + i);
+    }
+
+    String method = "testWritesWhileScanning";
+    initHRegion(tableName, method, families);
+    PutThread putThread = new PutThread(numRows, families, qualifiers);
+    putThread.start();
+    FlushThread flushThread = new FlushThread();
+    flushThread.start();
+
+    Scan scan = new Scan();
+    scan.setFilter(new RowFilter(CompareFilter.CompareOp.EQUAL,
+      new BinaryComparator(Bytes.toBytes("row0"))));
+
+    int expectedCount = numFamilies * numQualifiers;
+    List<KeyValue> res = new ArrayList<KeyValue>();
+
+    long prevTimestamp = 0L;
+    for (int i = 0; i < testCount; i++) {
+
+      if (i != 0 && i % compactInterval == 0) {
+        region.compactStores(true);
+      }
+
+      if (i != 0 && i % flushInterval == 0) {
+        //System.out.println("scan iteration = " + i);
+        flushThread.flush();
+      }
+
+      boolean previousEmpty = res.isEmpty();
+      res.clear();
+      InternalScanner scanner = region.getScanner(scan);
+      while (scanner.next(res)) ;
+      if (!res.isEmpty() || !previousEmpty || i > compactInterval) {
+        Assert.assertEquals("i=" + i, expectedCount, res.size());
+        long timestamp = res.get(0).getTimestamp();
+        Assert.assertTrue(timestamp >= prevTimestamp);
+        prevTimestamp = timestamp;
+      }
+    }
+
+    putThread.done();
+    putThread.join();
+    putThread.checkNoError();
+
+    flushThread.done();
+    flushThread.join();
+    flushThread.checkNoError();
+  }
+
+  protected class PutThread extends Thread {
+    private volatile boolean done;
+    private Throwable error = null;
+    private int numRows;
+    private byte[][] families;
+    private byte[][] qualifiers;
+
+    private PutThread(int numRows, byte[][] families,
+      byte[][] qualifiers) {
+      this.numRows = numRows;
+      this.families = families;
+      this.qualifiers = qualifiers;
+    }
+
+    public void done() {
+      done = true;
+      synchronized (this) {
+        interrupt();
+      }
+    }
+
+    public void checkNoError() {
+      if (error != null) {
+        Assert.assertNull(error);
+      }
+    }
+
+    @Override
+    public void run() {
+      done = false;
+      int val = 0;
+      while (!done) {
+        try {
+          for (int r = 0; r < numRows; r++) {
+            byte[] row = Bytes.toBytes("row" + r);
+            Put put = new Put(row);
+            for (int f = 0; f < families.length; f++) {
+              for (int q = 0; q < qualifiers.length; q++) {
+                put.add(families[f], qualifiers[q], (long) val,
+                  Bytes.toBytes(val));
+              }
+            }
+            region.put(put);
+            if (val > 0 && val % 47 == 0){
+              //System.out.println("put iteration = " + val);
+              Delete delete = new Delete(row, (long)val-30, null);
+              region.delete(delete, null, true);
+            }
+            val++;
+          }
+        } catch (IOException e) {
+          LOG.error("error while putting records", e);
+          error = e;
+          break;
+        }
+      }
+
+    }
+
+  }
+
+
+  /**
+   * Writes very wide records and gets the latest row every time..
+   * Flushes and compacts the region every now and then to keep things
+   * realistic.
+   *
+   * @throws IOException          by flush / scan / compaction
+   * @throws InterruptedException when joining threads
+   */
+  public void testWritesWhileGetting()
+    throws IOException, InterruptedException {
+    byte[] tableName = Bytes.toBytes("testWritesWhileScanning");
+    int testCount = 200;
+    int numRows = 1;
+    int numFamilies = 10;
+    int numQualifiers = 100;
+    int flushInterval = 10;
+    int compactInterval = 10 * flushInterval;
+    byte[][] families = new byte[numFamilies][];
+    for (int i = 0; i < numFamilies; i++) {
+      families[i] = Bytes.toBytes("family" + i);
+    }
+    byte[][] qualifiers = new byte[numQualifiers][];
+    for (int i = 0; i < numQualifiers; i++) {
+      qualifiers[i] = Bytes.toBytes("qual" + i);
+    }
+
+    String method = "testWritesWhileScanning";
+    initHRegion(tableName, method, families);
+    PutThread putThread = new PutThread(numRows, families, qualifiers);
+    putThread.start();
+    FlushThread flushThread = new FlushThread();
+    flushThread.start();
+
+    Get get = new Get(Bytes.toBytes("row0"));
+    Result result = null;
+
+    int expectedCount = numFamilies * numQualifiers;
+
+    long prevTimestamp = 0L;
+    for (int i = 0; i < testCount; i++) {
+
+      if (i != 0 && i % compactInterval == 0) {
+        region.compactStores(true);
+      }
+
+      if (i != 0 && i % flushInterval == 0) {
+        //System.out.println("iteration = " + i);
+        flushThread.flush();
+      }
+
+      boolean previousEmpty = result == null || result.isEmpty();
+      result = region.get(get, null);
+      if (!result.isEmpty() || !previousEmpty || i > compactInterval) {
+        Assert.assertEquals("i=" + i, expectedCount, result.size());
+        // TODO this was removed, now what dangit?!
+        // search looking for the qualifier in question?
+        long timestamp = 0;
+        for (KeyValue kv : result.sorted()) {
+          if (Bytes.equals(kv.getFamily(), families[0])
+            && Bytes.equals(kv.getQualifier(), qualifiers[0])) {
+            timestamp = kv.getTimestamp();
+          }
+        }
+        Assert.assertTrue(timestamp >= prevTimestamp);
+        prevTimestamp = timestamp;
+      }
+    }
+
+    putThread.done();
+    putThread.join();
+    putThread.checkNoError();
+
+    flushThread.done();
+    flushThread.join();
+    flushThread.checkNoError();
+  }
+
+
+  public void testIndexesScanWithOneDeletedRow() throws IOException {
+    byte[] tableName = Bytes.toBytes("testIndexesScanWithOneDeletedRow");
+    byte[] family = Bytes.toBytes("family");
+
+    //Setting up region
+    String method = "testIndexesScanWithOneDeletedRow";
+    initHRegion(tableName, method, new HBaseConfiguration(), family);
+
+    Put put = new Put(Bytes.toBytes(1L));
+    put.add(family, qual1, 1L, Bytes.toBytes(1L));
+    region.put(put);
+
+    region.flushcache();
+
+    Delete delete = new Delete(Bytes.toBytes(1L), 1L, null);
+    //delete.deleteColumn(family, qual1);
+    region.delete(delete, null, true);
+
+    put = new Put(Bytes.toBytes(2L));
+    put.add(family, qual1, 2L, Bytes.toBytes(2L));
+    region.put(put);
+
+    Scan idxScan = new Scan();
+    idxScan.addFamily(family);
+    idxScan.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ALL,
+      Arrays.<Filter>asList(new SingleColumnValueFilter(family, qual1,
+        CompareFilter.CompareOp.GREATER_OR_EQUAL,
+        new BinaryComparator(Bytes.toBytes(0L))),
+        new SingleColumnValueFilter(family, qual1,
+          CompareFilter.CompareOp.LESS_OR_EQUAL,
+          new BinaryComparator(Bytes.toBytes(3L)))
+      )));
+    InternalScanner scanner = region.getScanner(idxScan);
+    List<KeyValue> res = new ArrayList<KeyValue>();
+
+    //long start = System.nanoTime();
+    while (scanner.next(res)) ;
+    //long end = System.nanoTime();
+    //System.out.println("memStoreEmpty=" + memStoreEmpty + ", time=" + (end - start)/1000000D);
+    assertEquals(1L, res.size());
+
+  }
+
+
+  
   private void putData(int startRow, int numRows, byte [] qf,
       byte [] ...families)
   throws IOException {

Modified: hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java?rev=944527&r1=944526&r2=944527&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
(original)
+++ hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
Sat May 15 00:16:40 2010
@@ -240,7 +240,7 @@ public class TestStore extends TestCase 
 
   private void flush(int storeFilessize) throws IOException{
     this.store.snapshot();
-    this.store.flushCache(id++);
+    flushStore(store, id++);
     assertEquals(storeFilessize, this.store.getStorefiles().size());
     assertEquals(0, this.store.memstore.kvset.size());
   }
@@ -283,7 +283,7 @@ public class TestStore extends TestCase 
     assertTrue(ret > 0);
 
     // then flush.
-    this.store.flushCache(id++);
+    flushStore(store, id++);
     assertEquals(1, this.store.getStorefiles().size());
     // from the one we inserted up there, and a new one
     assertEquals(2, this.store.memstore.kvset.size());
@@ -309,4 +309,11 @@ public class TestStore extends TestCase 
     assertEquals(oldValue, Bytes.toLong(results.get(1).getValue()));
 
   }
+
+  private static void flushStore(Store store, long id) throws IOException {
+    StoreFlusher storeFlusher = store.getStoreFlusher(id);
+    storeFlusher.prepare();
+    storeFlusher.flushCache();
+    storeFlusher.commit();
+  }
 }
\ No newline at end of file



Mime
View raw message