hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject svn commit: r643761 [1/2] - in /hadoop/hbase/trunk: ./ src/java/org/apache/hadoop/hbase/ src/java/org/apache/hadoop/hbase/master/ src/java/org/apache/hadoop/hbase/regionserver/ src/java/org/apache/hadoop/hbase/util/ src/test/org/apache/hadoop/hbase/ sr...
Date Wed, 02 Apr 2008 06:58:33 GMT
Author: jimk
Date: Tue Apr  1 23:58:26 2008
New Revision: 643761

URL: http://svn.apache.org/viewvc?rev=643761&view=rev
Log:
HBASE-469   Streamline HStore startup and compactions

HMerge, HRegionServer

- changes that reflect changes to HRegion, CompactSplitThread and Flusher methods

ServerManager

- Return zero length array to region server if it is exiting or quiesced and Master is not yet ready to shut down.

QueueEntry

- removed. no longer used.

CompactSplitThread

- make compactionQueue a queue of HRegion.
- Add Set<HRegion> so we can quickly determine if a region is in the queue. BlockingQueue.contains() does a linear scan of the queue.
- Add a lock and interruptPolitely methods so that compactions/splits in progress are not interrupted.
- Don't add a region to the queue if it is already present.

Flusher

- change queue from DelayQueue to BlockingQueue, with HRegion entries instead of QueueEntry.
- Add Set<HRegion> to quickly determine if a region is already in the queue to avoid linear scan of BlockingQueue.contains().
- Only put regions in the queue for optional cache flush if the last time they were flushed is older than now - optionalFlushInterval.
- Only add regions to the queue if it is not already present.

HRegion

- don't request a cache flush if one has already been requested.
- Add setLastFlushTime so flusher can set it once it has queued an optional flush.
- Replace largestHStore with getLargestHStoreSize: returns long instead of HStoreSize object.
- Add midKey as parameter to splitRegion.
- Reorder start of splitRegion so it doesn't do any work before validating parameters.
- Remove needsSplit and compactIfNeeded - no longer needed.
- compactStores now returns midKey if split is needed.
- snapshotMemcaches now sets flushRequested to false and sets lastFlushTime to now.
- update does not request a cache flush if one has already been requested.
- Override equals and hashCode so HRegions can be stored in a HashSet.

HStore

- loadHStoreFiles now computes max sequence id and the initial size of the store.
- Add getter for family.
- internalCacheFlush updates store size, and logs both size of cache flush and resulting map file size (with debug logging enabled).
- Remove needsCompaction and hasReferences - no longer needed.
- compact() returns midKey if store needs to be split.
- compact() does all checking before actually starting a compaction.
- If store size is greater than desiredMaxFileSize, compact returns the midKey for the store regardless of whether a compaction was actually done.
- Added more synchronization in completeCompaction while iterating over storeFiles.
- completeCompaction computes new store size.
- New method checkSplit replaces method size. Returns midKey if store needs to be split and can be split.

HStoreSize

- removed. No longer needed.

HBaseTestCase

- only set fs if it has not already been set by a subclass.

TestTableIndex, TestTableMapReduce

- call FSUtil.deleteFully to clean up cruft left in local fs, by MapReduce

Removed:
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStoreSize.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/QueueEntry.java
Modified:
    hadoop/hbase/trunk/CHANGES.txt
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/HMerge.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ServerManager.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Flusher.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStore.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Memcache.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/MetaUtils.java
    hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/HBaseClusterTestCase.java
    hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/HBaseTestCase.java
    hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/TestHBaseCluster.java
    hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/mapred/TestTableIndex.java
    hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java
    hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestCompaction.java
    hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestHMemcache.java
    hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java
    hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestSplit.java
    hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestTimestamp.java
    hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/util/TestMergeTool.java

Modified: hadoop/hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=643761&r1=643760&r2=643761&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Tue Apr  1 23:58:26 2008
@@ -11,6 +11,9 @@
 
   NEW FEATURES
    HBASE-548   Tool to online single region
+   
+  IMPROVEMENTS
+   HBASE-469   Streamline HStore startup and compactions
 
 Release 0.1.0
 

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/HMerge.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/HMerge.java?rev=643761&r1=643760&r2=643761&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/HMerge.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/HMerge.java Tue Apr  1 23:58:26 2008
@@ -144,17 +144,16 @@
       long currentSize = 0;
       HRegion nextRegion = null;
       long nextSize = 0;
-      Text midKey = new Text();
       for (int i = 0; i < info.length - 1; i++) {
         if (currentRegion == null) {
           currentRegion =
             new HRegion(tabledir, hlog, fs, conf, info[i], null, null);
-          currentSize = currentRegion.largestHStore(midKey).getAggregate();
+          currentSize = currentRegion.getLargestHStoreSize();
         }
         nextRegion =
           new HRegion(tabledir, hlog, fs, conf, info[i + 1], null, null);
 
-        nextSize = nextRegion.largestHStore(midKey).getAggregate();
+        nextSize = nextRegion.getLargestHStoreSize();
 
         if ((currentSize + nextSize) <= (maxFilesize / 2)) {
           // We merge two adjacent regions if their total size is less than

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ServerManager.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ServerManager.java?rev=643761&r1=643760&r2=643761&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ServerManager.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ServerManager.java Tue Apr  1 23:58:26 2008
@@ -128,6 +128,9 @@
   }
   
   /**
+   * Called to process the messages sent from the region server to the master
+   * along with the heart beat.
+   * 
    * @param serverInfo
    * @param msgs
    * @return messages from master to region server indicating what region
@@ -142,7 +145,7 @@
     if (msgs.length > 0) {
       if (msgs[0].getMsg() == HMsg.MSG_REPORT_EXITING) {
         processRegionServerExit(serverName, msgs);
-        return new HMsg[]{msgs[0]};
+        return new HMsg[0];
       } else if (msgs[0].getMsg() == HMsg.MSG_REPORT_QUIESCED) {
         LOG.info("Region server " + serverName + " quiesced");
         master.quiescedMetaServers.incrementAndGet();
@@ -157,6 +160,11 @@
     }
 
     if (master.shutdownRequested && !master.closed.get()) {
+      if (msgs.length > 0 && msgs[0].getMsg() == HMsg.MSG_REPORT_QUIESCED) {
+        // Server is already quiesced, but we aren't ready to shut down
+        // return empty response
+        return new HMsg[0];
+      }
       // Tell the server to stop serving any user regions
       return new HMsg[]{new HMsg(HMsg.MSG_REGIONSERVER_QUIESCE)};
     }
@@ -522,7 +530,7 @@
   public int averageLoad() {
     return 0;
   }
-  
+
   /** @return the number of active servers */
   public int numServers() {
     return serversToServerInfo.size();

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java?rev=643761&r1=643760&r2=643761&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java Tue Apr  1 23:58:26 2008
@@ -20,6 +20,7 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
+import java.util.HashSet;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -42,19 +43,22 @@
 class CompactSplitThread extends Thread 
 implements RegionUnavailableListener, HConstants {
   static final Log LOG = LogFactory.getLog(CompactSplitThread.class);
-    
+  
   private HTable root = null;
   private HTable meta = null;
-  private long startTime;
+  private volatile long startTime;
   private final long frequency;
+  private final Integer lock = new Integer(0);
   
-  private HRegionServer server;
-  private HBaseConfiguration conf;
+  private final HRegionServer server;
+  private final HBaseConfiguration conf;
   
-  private final BlockingQueue<QueueEntry> compactionQueue =
-    new LinkedBlockingQueue<QueueEntry>();
-
-  /** constructor */
+  private final BlockingQueue<HRegion> compactionQueue =
+    new LinkedBlockingQueue<HRegion>();
+  
+  private final HashSet<HRegion> regionsInQueue = new HashSet<HRegion>();
+  
+  /** @param server */
   public CompactSplitThread(HRegionServer server) {
     super();
     this.server = server;
@@ -68,19 +72,26 @@
   @Override
   public void run() {
     while (!server.isStopRequested()) {
-      QueueEntry e = null;
+      HRegion r = null;
       try {
-        e = compactionQueue.poll(this.frequency, TimeUnit.MILLISECONDS);
-        if (e == null) {
-          continue;
+        r = compactionQueue.poll(this.frequency, TimeUnit.MILLISECONDS);
+        if (r != null) {
+          synchronized (regionsInQueue) {
+            regionsInQueue.remove(r);
+          }
+          synchronized (lock) {
+            // Don't interrupt us while we are working
+            Text midKey = r.compactStores();
+            if (midKey != null) {
+              split(r, midKey);
+            }
+          }
         }
-        e.getRegion().compactIfNeeded();
-        split(e.getRegion());
       } catch (InterruptedException ex) {
         continue;
       } catch (IOException ex) {
         LOG.error("Compaction failed" +
-            (e != null ? (" for region " + e.getRegion().getRegionName()) : ""),
+            (r != null ? (" for region " + r.getRegionName()) : ""),
             RemoteExceptionHandler.checkIOException(ex));
         if (!server.checkFileSystem()) {
           break;
@@ -88,30 +99,35 @@
 
       } catch (Exception ex) {
         LOG.error("Compaction failed" +
-            (e != null ? (" for region " + e.getRegion().getRegionName()) : ""),
+            (r != null ? (" for region " + r.getRegionName()) : ""),
             ex);
         if (!server.checkFileSystem()) {
           break;
         }
       }
     }
+    regionsInQueue.clear();
+    compactionQueue.clear();
     LOG.info(getName() + " exiting");
   }
   
   /**
-   * @param e QueueEntry for region to be compacted
+   * @param r HRegion store belongs to
    */
-  public void compactionRequested(QueueEntry e) {
-    compactionQueue.add(e);
-  }
-  
-  void compactionRequested(final HRegion r) {
-    compactionRequested(new QueueEntry(r, System.currentTimeMillis()));
+  public synchronized void compactionRequested(HRegion r) {
+    LOG.debug("Compaction requested for region: " + r.getRegionName());
+    synchronized (regionsInQueue) {
+      if (!regionsInQueue.contains(r)) {
+        compactionQueue.add(r);
+        regionsInQueue.add(r);
+      }
+    }
   }
   
-  private void split(final HRegion region) throws IOException {
+  private void split(final HRegion region, final Text midKey)
+  throws IOException {
     final HRegionInfo oldRegionInfo = region.getRegionInfo();
-    final HRegion[] newRegions = region.splitRegion(this);
+    final HRegion[] newRegions = region.splitRegion(this, midKey);
     if (newRegions == null) {
       // Didn't need to be split
       return;
@@ -196,6 +212,15 @@
       }
     } finally {
       server.getWriteLock().unlock();
+    }
+  }
+
+  /**
+   * Only interrupt once it's done with a run through the work loop.
+   */ 
+  void interruptPolitely() {
+    synchronized (lock) {
+      interrupt();
     }
   }
 }

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Flusher.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Flusher.java?rev=643761&r1=643760&r2=643761&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Flusher.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Flusher.java Tue Apr  1 23:58:26 2008
@@ -20,14 +20,16 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
-import java.util.concurrent.DelayQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.HashSet;
 import java.util.Set;
-import java.util.Iterator;
 import java.util.ConcurrentModificationException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.DroppedSnapshotException;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
@@ -35,60 +37,60 @@
 /** Flush cache upon request */
 class Flusher extends Thread implements CacheFlushListener {
   static final Log LOG = LogFactory.getLog(Flusher.class);
-  private final DelayQueue<QueueEntry> flushQueue =
-    new DelayQueue<QueueEntry>();
+  private final BlockingQueue<HRegion> flushQueue =
+    new LinkedBlockingQueue<HRegion>();
+  
+  private final HashSet<HRegion> regionsInQueue = new HashSet<HRegion>();
 
+  private final long threadWakeFrequency;
   private final long optionalFlushPeriod;
   private final HRegionServer server;
-  private final HBaseConfiguration conf;
   private final Integer lock = new Integer(0);
   
-  /** constructor */
-  public Flusher(final HRegionServer server) {
+  /**
+   * @param conf
+   * @param server
+   */
+  public Flusher(final HBaseConfiguration conf, final HRegionServer server) {
     super();
     this.server = server;
-    conf = server.conf;
     this.optionalFlushPeriod = conf.getLong(
-      "hbase.regionserver.optionalcacheflushinterval", 30 * 60 * 1000L);
+        "hbase.regionserver.optionalcacheflushinterval", 30 * 60 * 1000L);
+    this.threadWakeFrequency = conf.getLong(
+        HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
   }
   
   /** {@inheritDoc} */
   @Override
   public void run() {
+    long lastOptionalCheck = System.currentTimeMillis(); 
     while (!server.isStopRequested()) {
-      QueueEntry e = null;
+      HRegion r = null;
       try {
-        e = flushQueue.poll(server.threadWakeFrequency, TimeUnit.MILLISECONDS);
-        if (e == null) {
-          continue;
-        }
-        synchronized(lock) { // Don't interrupt while we're working
-          if (e.getRegion().flushcache()) {
-            server.compactionRequested(e);
-          }
-            
-          e.setExpirationTime(System.currentTimeMillis() +
-              optionalFlushPeriod);
-          flushQueue.add(e);
-        }
-        
-        // Now ensure that all the active regions are in the queue
-        Set<HRegion> regions = server.getRegionsToCheck();
-        for (HRegion r: regions) {
-          e = new QueueEntry(r, r.getLastFlushTime() + optionalFlushPeriod);
-          synchronized (flushQueue) {
-            if (!flushQueue.contains(e)) {
-              flushQueue.add(e);
+        long now = System.currentTimeMillis();
+        if (now - threadWakeFrequency > lastOptionalCheck) {
+          lastOptionalCheck = now;
+          // Queue up regions for optional flush if they need it
+          Set<HRegion> regions = server.getRegionsToCheck();
+          for (HRegion region: regions) {
+            synchronized (regionsInQueue) {
+              if (!regionsInQueue.contains(region) &&
+                  (now - optionalFlushPeriod) > region.getLastFlushTime()) {
+                regionsInQueue.add(region);
+                flushQueue.add(region);
+                region.setLastFlushTime(now);
+              }
             }
           }
         }
-
-        // Now make sure that the queue only contains active regions
-        synchronized (flushQueue) {
-          for (Iterator<QueueEntry> i = flushQueue.iterator(); i.hasNext();  ) {
-            e = i.next();
-            if (!regions.contains(e.getRegion())) {
-              i.remove();
+        r = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
+        if (r != null) {
+          synchronized (regionsInQueue) {
+            regionsInQueue.remove(r);
+          }
+          synchronized (lock) { // Don't interrupt while we're working
+            if (r.flushcache()) {
+              server.compactSplitThread.compactionRequested(r);
             }
           }
         }
@@ -108,32 +110,32 @@
         server.stop();
       } catch (IOException ex) {
         LOG.error("Cache flush failed" +
-          (e != null ? (" for region " + e.getRegion().getRegionName()) : ""),
+          (r != null ? (" for region " + r.getRegionName()) : ""),
           RemoteExceptionHandler.checkIOException(ex));
         if (!server.checkFileSystem()) {
           break;
         }
       } catch (Exception ex) {
         LOG.error("Cache flush failed" +
-          (e != null ? (" for region " + e.getRegion().getRegionName()) : ""),
+          (r != null ? (" for region " + r.getRegionName()) : ""),
           ex);
         if (!server.checkFileSystem()) {
           break;
         }
       }
     }
+    regionsInQueue.clear();
     flushQueue.clear();
     LOG.info(getName() + " exiting");
   }
   
   /** {@inheritDoc} */
-  public void flushRequested(HRegion region) {
-    QueueEntry e = new QueueEntry(region, System.currentTimeMillis());
-    synchronized (flushQueue) {
-      if (flushQueue.contains(e)) {
-        flushQueue.remove(e);
+  public void flushRequested(HRegion r) {
+    synchronized (regionsInQueue) {
+      if (!regionsInQueue.contains(r)) {
+        regionsInQueue.add(r);
+        flushQueue.add(r);
       }
-      flushQueue.add(e);
     }
   }
   

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=643761&r1=643760&r2=643761&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java Tue Apr  1 23:58:26 2008
@@ -316,6 +316,7 @@
     new ConcurrentHashMap<Long, TreeMap<HStoreKey, byte []>>();
 
   final AtomicLong memcacheSize = new AtomicLong(0);
+  private volatile boolean flushRequested;
 
   final Path basedir;
   final HLog log;
@@ -348,7 +349,6 @@
   private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
   private final Integer updateLock = new Integer(0);
   private final Integer splitLock = new Integer(0);
-  private final long desiredMaxFileSize;
   private final long minSequenceId;
   final AtomicInteger activeScannerCount = new AtomicInteger(0);
 
@@ -359,6 +359,8 @@
   /**
    * HRegion constructor.
    *
+   * @param basedir qualified path of directory where region should be located,
+   * usually the table directory.
    * @param log The HLog is the outbound log for any updates to the HRegion
    * (There's a single HLog for all the HRegions on a single HRegionServer.)
    * The log file is a logfile from the previous execution that's
@@ -366,20 +368,19 @@
    * appropriate log info for this HRegion. If there is a previous log file
    * (implying that the HRegion has been written-to before), then read it from
    * the supplied path.
-   * @param basedir qualified path of directory where region should be located,
-   * usually the table directory.
    * @param fs is the filesystem.  
    * @param conf is global configuration settings.
    * @param regionInfo - HRegionInfo that describes the region
    * @param initialFiles If there are initial files (implying that the HRegion
    * is new), then read them from the supplied path.
-   * @param listener an object that implements CacheFlushListener or null
+   * @param flushListener an object that implements CacheFlushListener or null
+   * or null
    * @throws IOException
    */
   public HRegion(Path basedir, HLog log, FileSystem fs, HBaseConfiguration conf, 
-      HRegionInfo regionInfo, Path initialFiles, CacheFlushListener listener)
-    throws IOException {
-    this(basedir, log, fs, conf, regionInfo, initialFiles, listener, null);
+      HRegionInfo regionInfo, Path initialFiles,
+      CacheFlushListener flushListener) throws IOException {
+    this(basedir, log, fs, conf, regionInfo, initialFiles, flushListener, null);
   }
   
   /**
@@ -399,15 +400,15 @@
    * @param regionInfo - HRegionInfo that describes the region
    * @param initialFiles If there are initial files (implying that the HRegion
    * is new), then read them from the supplied path.
-   * @param listener an object that implements CacheFlushListener or null
+   * @param flushListener an object that implements CacheFlushListener or null
    * @param reporter Call on a period so hosting server can report we're
    * making progress to master -- otherwise master might think region deploy
    * failed.  Can be null.
    * @throws IOException
    */
   public HRegion(Path basedir, HLog log, FileSystem fs, HBaseConfiguration conf, 
-      HRegionInfo regionInfo, Path initialFiles, CacheFlushListener listener,
-      final Progressable reporter)
+      HRegionInfo regionInfo, Path initialFiles,
+      CacheFlushListener flushListener, final Progressable reporter)
     throws IOException {
     
     this.basedir = basedir;
@@ -415,6 +416,8 @@
     this.fs = fs;
     this.conf = conf;
     this.regionInfo = regionInfo;
+    this.flushListener = flushListener;
+    this.flushRequested = false;
     this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
     this.regiondir = new Path(basedir, this.regionInfo.getEncodedName());
     Path oldLogFile = new Path(regiondir, HREGION_OLDLOGFILE_NAME);
@@ -466,20 +469,16 @@
     // By default, we flush the cache when 64M.
     this.memcacheFlushSize = conf.getInt("hbase.hregion.memcache.flush.size",
       1024*1024*64);
-    this.flushListener = listener;
+
     this.blockingMemcacheSize = this.memcacheFlushSize *
       conf.getInt("hbase.hregion.memcache.block.multiplier", 1);
 
-    // By default we split region if a file > DEFAULT_MAX_FILE_SIZE.
-    this.desiredMaxFileSize =
-      conf.getLong("hbase.hregion.max.filesize", DEFAULT_MAX_FILE_SIZE);
-
     // HRegion is ready to go!
     this.writestate.compacting = false;
     this.lastFlushTime = System.currentTimeMillis();
     LOG.info("region " + this.regionInfo.getRegionName() + " available");
   }
-  
+
   /**
    * @return Updates to this region need to have a sequence id that is >= to
    * the this number.
@@ -543,7 +542,7 @@
         // region.
         writestate.writesEnabled = false;
         LOG.debug("compactions and cache flushes disabled for region " +
-          regionName);
+            regionName);
         while (writestate.compacting || writestate.flushing) {
           LOG.debug("waiting for" +
               (writestate.compacting ? " compaction" : "") +
@@ -617,7 +616,7 @@
       }
     }
   }
-  
+
   //////////////////////////////////////////////////////////////////////////////
   // HRegion accessors
   //////////////////////////////////////////////////////////////////////////////
@@ -672,6 +671,11 @@
     return this.lastFlushTime;
   }
   
+  /** @param t the lastFlushTime */
+  void setLastFlushTime(long t) {
+    this.lastFlushTime = t;
+  }
+  
   //////////////////////////////////////////////////////////////////////////////
   // HRegion maintenance.  
   //
@@ -679,34 +683,16 @@
   // upkeep.
   //////////////////////////////////////////////////////////////////////////////
 
-  /**
-   * @param midkey
-   * @return returns size of largest HStore.  Also returns whether store is
-   * splitable or not (Its not splitable if region has a store that has a
-   * reference store file).
-   */
-  public HStoreSize largestHStore(Text midkey) {
-    HStoreSize biggest = null;
-    boolean splitable = true;
+  /** @return returns size of largest HStore. */
+  public long getLargestHStoreSize() {
+    long size = 0;
     for (HStore h: stores.values()) {
-      HStoreSize size = h.size(midkey);
-      // If we came across a reference down in the store, then propagate
-      // fact that region is not splitable.
-      if (splitable) {
-        splitable = size.splitable;
-      }
-      if (biggest == null) {
-        biggest = size;
-        continue;
-      }
-      if(size.getAggregate() > biggest.getAggregate()) { // Largest so far
-        biggest = size;
+      long storeSize = h.getSize();
+      if (storeSize > size) {
+        size = storeSize;
       }
     }
-    if (biggest != null) {
-      biggest.setSplitable(splitable);
-    }
-    return biggest;
+    return size;
   }
   
   /*
@@ -715,21 +701,17 @@
    * but instead create new 'reference' store files that read off the top and
    * bottom ranges of parent store files.
    * @param listener May be null.
+   * @param midKey key on which to split region
    * @return two brand-new (and open) HRegions or null if a split is not needed
    * @throws IOException
    */
-  HRegion[] splitRegion(final RegionUnavailableListener listener)
-    throws IOException {
+  HRegion[] splitRegion(final RegionUnavailableListener listener,
+      final Text midKey) throws IOException {
     synchronized (splitLock) {
-      Text midKey = new Text();
-      if (closed.get() || !needsSplit(midKey)) {
+      if (closed.get()) {
         return null;
       }
-      Path splits = new Path(this.regiondir, SPLITDIR);
-      if(!this.fs.exists(splits)) {
-        this.fs.mkdirs(splits);
-      }
-      // Make copies just in case and add start/end key checking: hbase-428.
+      // Add start/end key checking: hbase-428.
       Text startKey = new Text(this.regionInfo.getStartKey());
       Text endKey = new Text(this.regionInfo.getEndKey());
       if (startKey.equals(midKey)) {
@@ -740,6 +722,11 @@
         LOG.debug("Endkey and midkey are same, not splitting");
         return null;
       }
+      LOG.info("Starting split of region " + getRegionName());
+      Path splits = new Path(this.regiondir, SPLITDIR);
+      if(!this.fs.exists(splits)) {
+        this.fs.mkdirs(splits);
+      }
       HRegionInfo regionAInfo = new HRegionInfo(this.regionInfo.getTableDesc(),
         startKey, midKey);
       Path dirA = new Path(splits, regionAInfo.getEncodedName());
@@ -806,71 +793,6 @@
   }
   
   /*
-   * Iterates through all the HStores and finds the one with the largest
-   * MapFile size. If the size is greater than the (currently hard-coded)
-   * threshold, returns true indicating that the region should be split. The
-   * midKey for the largest MapFile is returned through the midKey parameter.
-   * It is possible for us to rule the region non-splitable even in excess of
-   * configured size.  This happens if region contains a reference file.  If
-   * a reference file, the region can not be split.
-   * 
-   * Note that there is no need to do locking in this method because it calls
-   * largestHStore which does the necessary locking.
-   * 
-   * @param midKey midKey of the largest MapFile
-   * @return true if the region should be split. midKey is set by this method.
-   * Check it for a midKey value on return.
-   */
-  boolean needsSplit(Text midKey) {
-    HStoreSize biggest = largestHStore(midKey);
-    if (biggest == null || midKey.getLength() == 0 || 
-      (midKey.equals(getStartKey()) && midKey.equals(getEndKey())) ) {
-      return false;
-    }
-    boolean split = (biggest.getAggregate() >= this.desiredMaxFileSize);
-    if (split) {
-      if (!biggest.isSplitable()) {
-        LOG.warn("Region " + getRegionName().toString() +
-            " is NOT splitable though its aggregate size is " +
-            StringUtils.humanReadableInt(biggest.getAggregate()) +
-            " and desired size is " +
-            StringUtils.humanReadableInt(this.desiredMaxFileSize));
-        split = false;
-      } else {
-        LOG.info("Splitting " + getRegionName().toString() +
-            " because largest aggregate size is " +
-            StringUtils.humanReadableInt(biggest.getAggregate()) +
-            " and desired size is " +
-            StringUtils.humanReadableInt(this.desiredMaxFileSize));
-      }
-    }
-    return split;
-  }
-  
-  /**
-   * Only do a compaction if it is necessary
-   * 
-   * @return whether or not there was a compaction
-   * @throws IOException
-   */
-  public boolean compactIfNeeded() throws IOException {
-    boolean needsCompaction = false;
-    for (HStore store: stores.values()) {
-      if (store.needsCompaction()) {
-        needsCompaction = true;
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(store.toString() + " needs compaction");
-        }
-        break;
-      }
-    }
-    if (!needsCompaction) {
-      return false;
-    }
-    return compactStores();
-  }
-  
-  /*
    * @param dir
    * @return compaction directory for the passed in <code>dir</code>
    */
@@ -893,59 +815,53 @@
    */
   private void doRegionCompactionCleanup() throws IOException {
     if (this.fs.exists(this.regionCompactionDir)) {
-      this.fs.delete(this.regionCompactionDir);
+      FileUtil.fullyDelete(this.fs, this.regionCompactionDir);
     }
   }
-  
+
   /**
-   * Compact all the stores.  This should be called periodically to make sure 
-   * the stores are kept manageable.  
+   * Called by compaction thread and after region is opened to compact the
+   * HStores if necessary.
    *
    * <p>This operation could block for a long time, so don't call it from a 
    * time-sensitive thread.
    *
-   * @return Returns TRUE if the compaction has completed.  FALSE, if the
-   * compaction was not carried out, because the HRegion is busy doing
-   * something else storage-intensive (like flushing the cache). The caller
-   * should check back later.
-   * 
    * Note that no locking is necessary at this level because compaction only
    * conflicts with a region split, and that cannot happen because the region
    * server does them sequentially and not in parallel.
    * 
+   * @return mid key if split is needed
    * @throws IOException
    */
-  public boolean compactStores() throws IOException {
+  public Text compactStores() throws IOException {
+    Text midKey = null;
     if (this.closed.get()) {
-      return false;
+      return midKey;
     }
     try {
       synchronized (writestate) {
         if (!writestate.compacting && writestate.writesEnabled) {
           writestate.compacting = true;
         } else {
-          LOG.info("NOT compacting region " +
-              this.regionInfo.getRegionName().toString() + ": compacting=" +
-              writestate.compacting + ", writesEnabled=" +
+          LOG.info("NOT compacting region " + getRegionName() +
+              ": compacting=" + writestate.compacting + ", writesEnabled=" +
               writestate.writesEnabled);
-            return false;
+            return midKey;
         }
       }
+      LOG.info("starting compaction on region " + getRegionName());
       long startTime = System.currentTimeMillis();
-      LOG.info("starting compaction on region " +
-        this.regionInfo.getRegionName().toString());
-      boolean status = true;
       doRegionCompactionPrep();
-      for (HStore store : stores.values()) {
-        if(!store.compact()) {
-          status = false;
+      for (HStore store: stores.values()) {
+        Text key = store.compact();
+        if (key != null && midKey == null) {
+          midKey = key;
         }
       }
       doRegionCompactionCleanup();
-      LOG.info("compaction completed on region " +
-        this.regionInfo.getRegionName().toString() + ". Took " +
-        StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime));
-      return status;
+      LOG.info("compaction completed on region " + getRegionName() +
+          ". Took " +
+          StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime));
       
     } finally {
       synchronized (writestate) {
@@ -953,6 +869,7 @@
         writestate.notifyAll();
       }
     }
+    return midKey;
   }
 
   /**
@@ -1030,6 +947,10 @@
     // will add to the unflushed size
     
     this.memcacheSize.set(0L);
+    this.flushRequested = false;
+    
+    // Record latest flush time
+    this.lastFlushTime = System.currentTimeMillis();
     
     for (HStore hstore: stores.values()) {
       hstore.snapshotMemcache();
@@ -1121,11 +1042,12 @@
     this.log.completeCacheFlush(this.regionInfo.getRegionName(),
         regionInfo.getTableDesc().getName(), sequenceId);
 
-    // D. Finally notify anyone waiting on memcache to clear:
+    // C. Finally notify anyone waiting on memcache to clear:
     // e.g. checkResources().
     synchronized (this) {
       notifyAll();
     }
+    
     if (LOG.isDebugEnabled()) {
       LOG.debug("Finished memcache flush for region " +
           this.regionInfo.getRegionName() + " in " +
@@ -1374,8 +1296,8 @@
     Text row = b.getRow();
     long lockid = obtainRowLock(row);
 
-    long commitTime =
-      (b.getTimestamp() == LATEST_TIMESTAMP) ? System.currentTimeMillis() : b.getTimestamp();
+    long commitTime = (b.getTimestamp() == LATEST_TIMESTAMP) ?
+        System.currentTimeMillis() : b.getTimestamp();
       
     try {
       List<Text> deletes = null;
@@ -1612,9 +1534,11 @@
             (val == null ? 0 : val.length));
         stores.get(HStoreKey.extractFamily(key.getColumn())).add(key, val);
       }
-      if (this.flushListener != null && size > this.memcacheFlushSize) {
+      if (this.flushListener != null && !this.flushRequested &&
+          size > this.memcacheFlushSize) {
         // Request a cache flush
         this.flushListener.flushRequested(this);
+        this.flushRequested = true;
       }
     }
   }
@@ -1729,6 +1653,18 @@
       }
     }
   }
+
+  /** {@inheritDoc} */
+  @Override
+  public boolean equals(Object o) {
+    return this.hashCode() == ((HRegion)o).hashCode();
+  }
+  
+  /** {@inheritDoc} */
+  @Override
+  public int hashCode() {
+    return this.regionInfo.getRegionName().hashCode();
+  }
   
   /** {@inheritDoc} */
   @Override
@@ -2011,8 +1947,7 @@
    * @throws IOException
    */
   public static void removeRegionFromMETA(final HRegionInterface srvr,
-    final Text metaRegionName, final Text regionName)
-  throws IOException {
+    final Text metaRegionName, final Text regionName) throws IOException {
     srvr.deleteAll(metaRegionName, regionName, HConstants.LATEST_TIMESTAMP);
   }
 
@@ -2025,8 +1960,7 @@
    * @throws IOException
    */
   public static void offlineRegionInMETA(final HRegionInterface srvr,
-    final Text metaRegionName, final HRegionInfo info)
-  throws IOException {
+    final Text metaRegionName, final HRegionInfo info) throws IOException {
     BatchUpdate b = new BatchUpdate(info.getRegionName());
     info.setOffline(true);
     b.put(COL_REGIONINFO, Writables.getBytes(info));

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=643761&r1=643760&r2=643761&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Tue Apr  1 23:58:26 2008
@@ -220,7 +220,7 @@
       conf.getInt("hbase.master.lease.period", 30 * 1000);
 
     // Cache flushing thread.
-    this.cacheFlusher = new Flusher(this);
+    this.cacheFlusher = new Flusher(conf, this);
     
     // Compaction thread
     this.compactSplitThread = new CompactSplitThread(this);
@@ -295,6 +295,7 @@
               LOG.info("Server quiesced and not serving any regions. " +
               "Starting shutdown");
               stopRequested.set(true);
+              this.outboundMsgs.clear();
               continue;
             }
 
@@ -412,7 +413,7 @@
     // Send interrupts to wake up threads if sleeping so they notice shutdown.
     // TODO: Should we check they are alive?  If OOME could have exited already
     cacheFlusher.interruptPolitely();
-    compactSplitThread.interrupt();
+    compactSplitThread.interruptPolitely();
     synchronized (logRollerLock) {
       this.logRoller.interrupt();
     }
@@ -828,8 +829,8 @@
       } finally {
         this.lock.writeLock().unlock();
       }
-      reportOpen(regionInfo); 
     }
+    reportOpen(regionInfo); 
   }
   
   /*
@@ -1226,10 +1227,6 @@
   /** @return the write lock for the server */
   ReentrantReadWriteLock.WriteLock getWriteLock() {
     return lock.writeLock();
-  }
-
-  void compactionRequested(QueueEntry e) {
-    compactSplitThread.compactionRequested(e);
   }
 
   /**

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStore.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=643761&r1=643760&r2=643761&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStore.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStore.java Tue Apr  1 23:58:26 2008
@@ -24,15 +24,11 @@
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.regex.Matcher;
@@ -44,7 +40,6 @@
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.filter.RowFilterInterface;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -53,7 +48,6 @@
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.hbase.BloomFilterDescriptor;
@@ -69,7 +63,6 @@
 import org.apache.hadoop.hbase.HStoreKey;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
 import org.apache.hadoop.hbase.io.Cell;
-import org.apache.hadoop.hbase.io.RowResult;
 
 /**
  * HStore maintains a bunch of data files.  It is responsible for maintaining 
@@ -87,7 +80,7 @@
    * If reference, then the regex has more than just one group.  Group 1 is
    * this files id.  Group 2 the referenced region name, etc.
    */
-  private static Pattern REF_NAME_PARSER =
+  private static final Pattern REF_NAME_PARSER =
     Pattern.compile("^(\\d+)(?:\\.(.+))?$");
   
   private static final String BLOOMFILTER_FILE_NAME = "filter";
@@ -101,15 +94,16 @@
   private final HBaseConfiguration conf;
   private final Path filterDir;
   final Filter bloomFilter;
-  private final Path compactionDir;
 
-  private final Integer compactLock = new Integer(0);
+  private final long desiredMaxFileSize;
+  private volatile long storeSize;
+
   private final Integer flushLock = new Integer(0);
 
   private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
   final AtomicInteger activeScanners = new AtomicInteger(0);
 
-  final String storeName;
+  final Text storeName;
 
   /*
    * Sorted Map of readers keyed by sequence id (Most recent should be last in
@@ -125,8 +119,15 @@
   private final SortedMap<Long, MapFile.Reader> readers =
     new TreeMap<Long, MapFile.Reader>();
 
+  // The most-recent log-seq-ID that's present.  The most-recent such ID means
+  // we can ignore all log messages up to and including that ID (because they're
+  // already reflected in the TreeMaps).
   private volatile long maxSeqId;
+  
+  private final Path compactionDir;
+  private final Integer compactLock = new Integer(0);
   private final int compactionThreshold;
+  
   private final ReentrantReadWriteLock newScannerLock =
     new ReentrantReadWriteLock();
 
@@ -177,8 +178,18 @@
     
     this.compactionDir = HRegion.getCompactionDir(basedir);
     this.storeName =
-      this.info.getEncodedName() + "/" + this.family.getFamilyName();
+      new Text(this.info.getEncodedName() + "/" + this.family.getFamilyName());
+    
+    // By default, we compact if an HStore has more than
+    // MIN_COMMITS_FOR_COMPACTION map files
+    this.compactionThreshold =
+      conf.getInt("hbase.hstore.compactionThreshold", 3);
     
+    // By default we split region if a file > DEFAULT_MAX_FILE_SIZE.
+    this.desiredMaxFileSize =
+      conf.getLong("hbase.hregion.max.filesize", DEFAULT_MAX_FILE_SIZE);
+    this.storeSize = 0L;
+
     if (family.getCompression() == HColumnDescriptor.CompressionType.BLOCK) {
       this.compression = SequenceFile.CompressionType.BLOCK;
     } else if (family.getCompression() ==
@@ -219,21 +230,10 @@
     // MapFiles are in a reliable state.  Every entry in 'mapdir' must have a 
     // corresponding one in 'loginfodir'. Without a corresponding log info
     // file, the entry in 'mapdir' must be deleted.
-    List<HStoreFile> hstoreFiles = loadHStoreFiles(infodir, mapdir);
-    for(HStoreFile hsf: hstoreFiles) {
-      this.storefiles.put(Long.valueOf(hsf.loadInfo(fs)), hsf);
-    }
+    // loadHStoreFiles also computes the max sequence id
+    this.maxSeqId = -1L;
+    this.storefiles.putAll(loadHStoreFiles(infodir, mapdir));
 
-    // Now go through all the HSTORE_LOGINFOFILEs and figure out the
-    // most-recent log-seq-ID that's present.  The most-recent such ID means we
-    // can ignore all log messages up to and including that ID (because they're
-    // already reflected in the TreeMaps).
-    //
-    // If the HSTORE_LOGINFOFILE doesn't contain a number, just ignore it. That
-    // means it was built prior to the previous run of HStore, and so it cannot 
-    // contain any updates also contained in the log.
-    
-    this.maxSeqId = getMaxSequenceId(hstoreFiles);
     if (LOG.isDebugEnabled()) {
       LOG.debug("maximum sequence id for hstore " + storeName + " is " +
           this.maxSeqId);
@@ -250,16 +250,6 @@
         " -- continuing.  Probably DATA LOSS!", e);
     }
 
-    // By default, we compact if an HStore has more than
-    // MIN_COMMITS_FOR_COMPACTION map files
-    this.compactionThreshold =
-      conf.getInt("hbase.hstore.compactionThreshold", 3);
-    
-    // We used to compact in here before bringing the store online.  Instead
-    // get it online quick even if it needs compactions so we can start
-    // taking updates as soon as possible (Once online, can take updates even
-    // during a compaction).
-
     // Move maxSeqId on by one. Why here?  And not in HRegion?
     this.maxSeqId += 1;
     
@@ -276,28 +266,13 @@
         first = false;
       } else {
         this.readers.put(e.getKey(),
-          e.getValue().getReader(this.fs, this.bloomFilter));
+            e.getValue().getReader(this.fs, this.bloomFilter));
       }
     }
   }
-  
-  /* 
-   * @param hstoreFiles
-   * @return Maximum sequence number found or -1.
-   * @throws IOException
-   */
-  private long getMaxSequenceId(final List<HStoreFile> hstoreFiles)
-  throws IOException {
-    long maxSeqID = -1;
-    for (HStoreFile hsf : hstoreFiles) {
-      long seqid = hsf.loadInfo(fs);
-      if (seqid > 0) {
-        if (seqid > maxSeqID) {
-          maxSeqID = seqid;
-        }
-      }
-    }
-    return maxSeqID;
+
+  HColumnDescriptor getFamily() {
+    return this.family;
   }
   
   long getMaxSequenceId() {
@@ -388,7 +363,7 @@
    * @param mapdir qualified path for map file directory
    * @throws IOException
    */
-  private List<HStoreFile> loadHStoreFiles(Path infodir, Path mapdir)
+  private SortedMap<Long, HStoreFile> loadHStoreFiles(Path infodir, Path mapdir)
   throws IOException {
     if (LOG.isDebugEnabled()) {
       LOG.debug("infodir: " + infodir.toString() + " mapdir: " +
@@ -397,7 +372,7 @@
     // Look first at info files.  If a reference, these contain info we need
     // to create the HStoreFile.
     FileStatus infofiles[] = fs.listStatus(infodir);
-    ArrayList<HStoreFile> results = new ArrayList<HStoreFile>(infofiles.length);
+    SortedMap<Long, HStoreFile> results = new TreeMap<Long, HStoreFile>();
     ArrayList<Path> mapfiles = new ArrayList<Path>(infofiles.length);
     for (int i = 0; i < infofiles.length; i++) {
       Path p = infofiles[i].getPath();
@@ -414,6 +389,11 @@
       boolean isReference = isReference(p, m);
       long fid = Long.parseLong(m.group(1));
 
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("loading file " + p.toString() + ", isReference=" +
+            isReference + ", file id=" + fid);
+      }
+      
       HStoreFile curfile = null;
       HStoreFile.Reference reference = null;
       if (isReference) {
@@ -421,6 +401,22 @@
       }
       curfile = new HStoreFile(conf, fs, basedir, info.getEncodedName(),
           family.getFamilyName(), fid, reference);
+
+      storeSize += curfile.length();
+      long storeSeqId = -1;
+      try {
+        storeSeqId = curfile.loadInfo(fs);
+        if (storeSeqId > this.maxSeqId) {
+          this.maxSeqId = storeSeqId;
+        }
+      } catch (IOException e) {
+        // If the HSTORE_LOGINFOFILE doesn't contain a number, just ignore it.
+        // That means it was built prior to the previous run of HStore, and so
+        // it cannot contain any updates also contained in the log.
+        LOG.info("HSTORE_LOGINFOFILE " + curfile +
+            " does not contain a sequence number - ignoring");
+      }
+
       Path mapfile = curfile.getMapFilePath();
       if (!fs.exists(mapfile)) {
         fs.delete(curfile.getInfoFilePath());
@@ -432,7 +428,7 @@
       // TODO: Confirm referent exists.
       
       // Found map and sympathetic info file.  Add this hstorefile to result.
-      results.add(curfile);
+      results.put(storeSeqId, curfile);
       // Keep list of sympathetic data mapfiles for cleaning info dir in next
       // section.  Make sure path is fully qualified for compare.
       mapfiles.add(mapfile);
@@ -581,17 +577,14 @@
       for (MapFile.Reader reader: this.readers.values()) {
         reader.close();
       }
-      this.readers.clear();
       result = new ArrayList<HStoreFile>(storefiles.values());
-      this.storefiles.clear();
       LOG.debug("closed " + this.storeName);
       return result;
     } finally {
       this.lock.writeLock().unlock();
     }
   }
-
-
+  
   //////////////////////////////////////////////////////////////////////////////
   // Flush changes to disk
   //////////////////////////////////////////////////////////////////////////////
@@ -627,10 +620,10 @@
     synchronized(flushLock) {
       // A. Write the Maps out to the disk
       HStoreFile flushedFile = new HStoreFile(conf, fs, basedir,
-        info.getEncodedName(), family.getFamilyName(), -1L, null);
+          info.getEncodedName(), family.getFamilyName(), -1L, null);
       String name = flushedFile.toString();
       MapFile.Writer out = flushedFile.getWriter(this.fs, this.compression,
-        this.bloomFilter);
+          this.bloomFilter);
       
       // Here we tried picking up an existing HStoreFile from disk and
       // interlacing the memcache flush compacting as we go.  The notion was
@@ -644,18 +637,23 @@
       // Related, looks like 'merging compactions' in BigTable paper interlaces
       // a memcache flush.  We don't.
       int entries = 0;
+      long cacheSize = 0;
       try {
         for (Map.Entry<HStoreKey, byte []> es: cache.entrySet()) {
           HStoreKey curkey = es.getKey();
+          byte[] bytes = es.getValue();
           TextSequence f = HStoreKey.extractFamily(curkey.getColumn());
           if (f.equals(this.family.getFamilyName())) {
             entries++;
-            out.append(curkey, new ImmutableBytesWritable(es.getValue()));
+            out.append(curkey, new ImmutableBytesWritable(bytes));
+            cacheSize += curkey.getSize() + (bytes != null ? bytes.length : 0);
           }
         }
       } finally {
         out.close();
       }
+      long newStoreSize = flushedFile.length(); 
+      storeSize += newStoreSize;
 
       // B. Write out the log sequence number that corresponds to this output
       // MapFile.  The MapFile is current up to and including the log seq num.
@@ -676,14 +674,14 @@
         this.storefiles.put(flushid, flushedFile);
         if(LOG.isDebugEnabled()) {
           LOG.debug("Added " + name + " with " + entries +
-            " entries, sequence id " + logCacheFlushId + ", and size " +
-            StringUtils.humanReadableInt(flushedFile.length()) + " for " +
+            " entries, sequence id " + logCacheFlushId + ", data size " +
+            StringUtils.humanReadableInt(cacheSize) + ", file size " +
+            StringUtils.humanReadableInt(newStoreSize) + " for " +
             this.storeName);
         }
       } finally {
         this.lock.writeLock().unlock();
       }
-      return;
     }
   }
 
@@ -692,28 +690,6 @@
   //////////////////////////////////////////////////////////////////////////////
   
   /**
-   * @return True if this store needs compaction.
-   */
-  boolean needsCompaction() {
-    return this.storefiles != null &&
-      (this.storefiles.size() >= this.compactionThreshold || hasReferences());
-  }
-  
-  /*
-   * @return True if this store has references.
-   */
-  private boolean hasReferences() {
-    if (this.storefiles != null) {
-      for (HStoreFile hsf: this.storefiles.values()) {
-        if (hsf.isReference()) {
-          return true;
-        }
-      }
-    }
-    return false;
-  }
-
-  /**
    * Compact the back-HStores.  This method may take some time, so the calling 
    * thread must be able to block for long periods.
    * 
@@ -728,42 +704,66 @@
    * 
    * We don't want to hold the structureLock for the whole time, as a compact() 
    * can be lengthy and we want to allow cache-flushes during this period.
-   * @throws IOException
    * 
-   * @return true if compaction completed successfully
+   * @return mid key if a split is needed, null otherwise
+   * @throws IOException
    */
-  boolean compact() throws IOException {
+  Text compact() throws IOException {
     synchronized (compactLock) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("started compaction of " + storefiles.size() +
-          " files using " + compactionDir.toString() + " for " +
-          this.storeName);
-      }
+      long maxId = -1;
+      List<HStoreFile> filesToCompact = null;
+      synchronized (storefiles) {
+        filesToCompact = new ArrayList<HStoreFile>(this.storefiles.values());
+        if (filesToCompact.size() < 1) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Not compacting " + this.storeName +
+            " because no store files to compact.");
+          }
+          return checkSplit();
+        } else if (filesToCompact.size() == 1) {
+          if (!filesToCompact.get(0).isReference()) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Not compacting " + this.storeName +
+              " because only one store file and it is not a reference");
+            }
+            return checkSplit();
+          }
+        } else if (filesToCompact.size() < compactionThreshold) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Not compacting " + this.storeName +
+                " because number of stores " + filesToCompact.size() +
+                " < compaction threshold " + compactionThreshold);
+          }
+          return checkSplit();
+        }
+
+        if (!fs.exists(compactionDir) && !fs.mkdirs(compactionDir)) {
+          LOG.warn("Mkdir on " + compactionDir.toString() + " failed");
+          return checkSplit();
+        }
 
-      // Storefiles are keyed by sequence id. The oldest file comes first.
-      // We need to return out of here a List that has the newest file first.
-      List<HStoreFile> filesToCompact =
-        new ArrayList<HStoreFile>(this.storefiles.values());
-      Collections.reverse(filesToCompact);
-      if (filesToCompact.size() < 1 ||
-        (filesToCompact.size() == 1 && !filesToCompact.get(0).isReference())) {
         if (LOG.isDebugEnabled()) {
-          LOG.debug("nothing to compact for " + this.storeName);
+          LOG.debug("started compaction of " + filesToCompact.size() +
+              " files using " + compactionDir.toString() + " for " +
+              this.storeName);
         }
-        return false;
-      }
 
-      if (!fs.exists(compactionDir) && !fs.mkdirs(compactionDir)) {
-        LOG.warn("Mkdir on " + compactionDir.toString() + " failed");
-        return false;
+        // Storefiles are keyed by sequence id. The oldest file comes first.
+        // We need to return out of here a List that has the newest file first.
+        Collections.reverse(filesToCompact);
+
+        // The max-sequenceID in any of the to-be-compacted TreeMaps is the 
+        // last key of storefiles.
+
+        maxId = this.storefiles.lastKey();
       }
 
       // Step through them, writing to the brand-new MapFile
       HStoreFile compactedOutputFile = new HStoreFile(conf, fs, 
-        this.compactionDir, info.getEncodedName(), family.getFamilyName(),
-        -1L, null);
+          this.compactionDir, info.getEncodedName(), family.getFamilyName(),
+          -1L, null);
       MapFile.Writer compactedOut = compactedOutputFile.getWriter(this.fs,
-        this.compression, this.bloomFilter);
+          this.compression, this.bloomFilter);
       try {
         compactHStoreFiles(compactedOut, filesToCompact);
       } finally {
@@ -771,14 +771,17 @@
       }
 
       // Now, write out an HSTORE_LOGINFOFILE for the brand-new TreeMap.
-      // Compute max-sequenceID seen in any of the to-be-compacted TreeMaps.
-      long maxId = getMaxSequenceId(filesToCompact);
       compactedOutputFile.writeInfo(fs, maxId);
 
       // Move the compaction into place.
       completeCompaction(filesToCompact, compactedOutputFile);
-      return true;
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Completed compaction of " + this.storeName +
+            " store size is " + StringUtils.humanReadableInt(storeSize));
+      }
     }
+    return checkSplit();
   }
   
   /*
@@ -975,11 +978,11 @@
    * <pre>
    * 1) Wait for active scanners to exit
    * 2) Acquiring the write-lock
-   * 3) Figuring out what MapFiles are going to be replaced
-   * 4) Moving the new compacted MapFile into place
-   * 5) Unloading all the replaced MapFiles.
-   * 6) Deleting all the old MapFile files.
-   * 7) Loading the new TreeMap.
+   * 3) Moving the new compacted MapFile into place
+   * 4) Unloading all the replaced MapFiles and close.
+   * 5) Deleting all the replaced MapFile files.
+   * 6) Loading the new TreeMap.
+   * 7) Compute new store size
    * 8) Releasing the write-lock
    * 9) Allow new scanners to proceed.
    * </pre>
@@ -1027,46 +1030,53 @@
 
         // 4. and 5. Unload all the replaced MapFiles, close and delete.
         
-        List<Long> toDelete = new ArrayList<Long>();
-        for (Map.Entry<Long, HStoreFile> e: this.storefiles.entrySet()) {
-          if (!compactedFiles.contains(e.getValue())) {
-            continue;
-          }
-          Long key = e.getKey();
-          MapFile.Reader reader = this.readers.remove(key);
-          if (reader != null) {
-            reader.close();
+        synchronized (storefiles) {
+          List<Long> toDelete = new ArrayList<Long>();
+          for (Map.Entry<Long, HStoreFile> e: this.storefiles.entrySet()) {
+            if (!compactedFiles.contains(e.getValue())) {
+              continue;
+            }
+            Long key = e.getKey();
+            MapFile.Reader reader = this.readers.remove(key);
+            if (reader != null) {
+              reader.close();
+            }
+            toDelete.add(key);
           }
-          toDelete.add(key);
-        }
 
-        try {
-          for (Long key: toDelete) {
-            HStoreFile hsf = this.storefiles.remove(key);
-            hsf.delete();
-          }
+          try {
+            for (Long key: toDelete) {
+              HStoreFile hsf = this.storefiles.remove(key);
+              hsf.delete();
+            }
 
-          // 6. Loading the new TreeMap.
-          Long orderVal = Long.valueOf(finalCompactedFile.loadInfo(fs));
-          this.readers.put(orderVal,
-            // Use a block cache (if configured) for this reader since
-            // it is the only one.
-            finalCompactedFile.getReader(this.fs, this.bloomFilter,
-                family.isBlockCacheEnabled()));
-          this.storefiles.put(orderVal, finalCompactedFile);
-        } catch (IOException e) {
-          e = RemoteExceptionHandler.checkIOException(e);
-          LOG.error("Failed replacing compacted files for " + this.storeName +
-              ". Compacted file is " + finalCompactedFile.toString() +
-              ".  Files replaced are " + compactedFiles.toString() +
-              " some of which may have been already removed", e);
+            // 6. Loading the new TreeMap.
+            Long orderVal = Long.valueOf(finalCompactedFile.loadInfo(fs));
+            this.readers.put(orderVal,
+                // Use a block cache (if configured) for this reader since
+                // it is the only one.
+                finalCompactedFile.getReader(this.fs, this.bloomFilter,
+                    family.isBlockCacheEnabled()));
+            this.storefiles.put(orderVal, finalCompactedFile);
+          } catch (IOException e) {
+            e = RemoteExceptionHandler.checkIOException(e);
+            LOG.error("Failed replacing compacted files for " + this.storeName +
+                ". Compacted file is " + finalCompactedFile.toString() +
+                ".  Files replaced are " + compactedFiles.toString() +
+                " some of which may have been already removed", e);
+          }
+          // 7. Compute new store size
+          storeSize = 0L;
+          for (HStoreFile hsf: storefiles.values()) {
+            storeSize += hsf.length();
+          }
         }
       } finally {
-        // 7. Releasing the write-lock
+        // 8. Releasing the write-lock
         this.lock.writeLock().unlock();
       }
     } finally {
-      // 8. Allow new scanners to proceed.
+      // 9. Allow new scanners to proceed.
       newScannerLock.writeLock().unlock();
     }
   }
@@ -1304,7 +1314,7 @@
           
           do{
             // if the row matches, we might want this one.
-            if(rowMatches(origin, readkey)){
+            if (rowMatches(origin, readkey)) {
               // if the cell matches, then we definitely want this key.
               if (cellMatches(origin, readkey)) {
                 // store the key if it isn't deleted or superceeded by what's
@@ -1323,11 +1333,11 @@
                 // timestamps, so move to the next key
                 continue;
               }
-            } else{
+            } else {
               // the row doesn't match, so we've gone too far.
               break;
             }
-          }while(map.next(readkey, readval)); // advance to the next key
+          } while (map.next(readkey, readval)); // advance to the next key
         }
       }
       
@@ -1572,71 +1582,77 @@
   }
   
   /**
-   * Gets size for the store.
+   * Determines if HStore can be split
    * 
-   * @param midKey Gets set to the middle key of the largest splitable store
-   * file or its set to empty if largest is not splitable.
-   * @return Sizes for the store and the passed <code>midKey</code> is
-   * set to midKey of largest splitable.  Otherwise, its set to empty
-   * to indicate we couldn't find a midkey to split on
-   */
-  HStoreSize size(Text midKey) {
-    long maxSize = 0L;
-    long aggregateSize = 0L;
-    // Not splitable if we find a reference store file present in the store.
-    boolean splitable = true;
+   * @return midKey if store can be split, null otherwise
+   */
+  Text checkSplit() {
     if (this.storefiles.size() <= 0) {
-      return new HStoreSize(0, 0, splitable);
+      return null;
+    }
+    if (storeSize < this.desiredMaxFileSize) {
+      return null;
     }
-    
     this.lock.readLock().lock();
     try {
+      // Not splitable if we find a reference store file present in the store.
+      boolean splitable = true;
+      long maxSize = 0L;
       Long mapIndex = Long.valueOf(0L);
       // Iterate through all the MapFiles
-      for (Map.Entry<Long, HStoreFile> e: storefiles.entrySet()) {
-        HStoreFile curHSF = e.getValue();
-        long size = curHSF.length();
-        aggregateSize += size;
-        if (maxSize == 0L || size > maxSize) {
-          // This is the largest one so far
-          maxSize = size;
-          mapIndex = e.getKey();
-        }
-        if (splitable) {
-          splitable = !curHSF.isReference();
-        }
-      }
-      if (splitable) {
-        MapFile.Reader r = this.readers.get(mapIndex);
-        // seek back to the beginning of mapfile
-        r.reset();
-        // get the first and last keys
-        HStoreKey firstKey = new HStoreKey();
-        HStoreKey lastKey = new HStoreKey();
-        Writable value = new ImmutableBytesWritable();
-        r.next(firstKey, value);
-        r.finalKey(lastKey);
-        // get the midkey
-        HStoreKey mk = (HStoreKey)r.midKey();
-        if (mk != null) {
-          // if the midkey is the same as the first and last keys, then we cannot
-          // (ever) split this region. 
-          if (mk.getRow().equals(firstKey.getRow()) && 
-              mk.getRow().equals(lastKey.getRow())) {
-            return new HStoreSize(aggregateSize, maxSize, false);
+      synchronized (storefiles) {
+        for (Map.Entry<Long, HStoreFile> e: storefiles.entrySet()) {
+          HStoreFile curHSF = e.getValue();
+          long size = curHSF.length();
+          if (size > maxSize) {
+            // This is the largest one so far
+            maxSize = size;
+            mapIndex = e.getKey();
+          }
+          if (splitable) {
+            splitable = !curHSF.isReference();
           }
-          // Otherwise, set midKey
-          midKey.set(mk.getRow());
         }
       }
+      if (!splitable) {
+        return null;
+      }
+      MapFile.Reader r = this.readers.get(mapIndex);
+
+      // seek back to the beginning of mapfile
+      r.reset();
+
+      // get the first and last keys
+      HStoreKey firstKey = new HStoreKey();
+      HStoreKey lastKey = new HStoreKey();
+      Writable value = new ImmutableBytesWritable();
+      r.next(firstKey, value);
+      r.finalKey(lastKey);
+
+      // get the midkey
+      HStoreKey mk = (HStoreKey)r.midKey();
+      if (mk != null) {
+        // if the midkey is the same as the first and last keys, then we cannot
+        // (ever) split this region. 
+        if (mk.getRow().equals(firstKey.getRow()) && 
+            mk.getRow().equals(lastKey.getRow())) {
+          return null;
+        }
+        return mk.getRow();
+      }
     } catch(IOException e) {
       LOG.warn("Failed getting store size for " + this.storeName, e);
     } finally {
       this.lock.readLock().unlock();
     }
-    return new HStoreSize(aggregateSize, maxSize, splitable);
+    return null;
   }
   
+  /** @return aggregate size of HStore */
+  public long getSize() {
+    return storeSize;
+  }
+     
   //////////////////////////////////////////////////////////////////////////////
   // File administration
   //////////////////////////////////////////////////////////////////////////////
@@ -1665,7 +1681,7 @@
   /** {@inheritDoc} */
   @Override
   public String toString() {
-    return this.storeName;
+    return this.storeName.toString();
   }
 
   /*

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Memcache.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Memcache.java?rev=643761&r1=643760&r2=643761&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Memcache.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Memcache.java Tue Apr  1 23:58:26 2008
@@ -28,7 +28,6 @@
 import java.util.List;
 import java.util.ArrayList;
 import java.util.Set;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -63,7 +62,7 @@
     snapshot = 
       Collections.synchronizedSortedMap(new TreeMap<HStoreKey, byte []>());
   }
-
+  
   /**
    * Creates a snapshot of the current Memcache
    */
@@ -196,12 +195,9 @@
   /**
    * @param row
    * @param timestamp
-   * @return the key that matches <i>row</i> exactly, or the one that
-   * immediately preceeds it.
    */
   void getRowKeyAtOrBefore(final Text row, 
-    SortedMap<HStoreKey, Long> candidateKeys)
-  throws IOException {
+    SortedMap<HStoreKey, Long> candidateKeys) {
     this.lock.readLock().lock();
     
     try {

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/MetaUtils.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/MetaUtils.java?rev=643761&r1=643760&r2=643761&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/MetaUtils.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/MetaUtils.java Tue Apr  1 23:58:26 2008
@@ -35,12 +35,13 @@
 
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.io.Cell;
 import org.apache.hadoop.hbase.regionserver.HLog;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HScannerInterface;
 import org.apache.hadoop.hbase.HStoreKey;
-import org.apache.hadoop.hbase.io.Cell;
 import org.apache.hadoop.hbase.client.HTable;
 
 /**
@@ -316,13 +317,16 @@
   throws IOException {
     HTable t = new HTable(c, HConstants.META_TABLE_NAME);
     Cell cell = t.get(row, HConstants.COL_REGIONINFO);
+    if (cell == null) {
+      throw new IOException("no information for row " + row);
+    }
     // Throws exception if null.
     HRegionInfo info = Writables.getHRegionInfo(cell);
-    long id = t.startUpdate(row);
+    BatchUpdate b = new BatchUpdate(row);
     info.setOffline(onlineOffline);
-    t.put(id, HConstants.COL_REGIONINFO, Writables.getBytes(info));
-    t.delete(id, HConstants.COL_SERVER);
-    t.delete(id, HConstants.COL_STARTCODE);
-    t.commit(id);
+    b.put(HConstants.COL_REGIONINFO, Writables.getBytes(info));
+    b.delete(HConstants.COL_SERVER);
+    b.delete(HConstants.COL_STARTCODE);
+    t.commit(b);
   }
 }

Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/HBaseClusterTestCase.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/HBaseClusterTestCase.java?rev=643761&r1=643760&r2=643761&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/HBaseClusterTestCase.java (original)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/HBaseClusterTestCase.java Tue Apr  1 23:58:26 2008
@@ -44,7 +44,8 @@
   protected MiniDFSCluster dfsCluster;
   protected int regionServers;
   protected boolean startDfs;
-  
+
+  /** default constructor */
   public HBaseClusterTestCase() {
     this(1);
   }
@@ -53,6 +54,7 @@
    * Start a MiniHBaseCluster with regionServers region servers in-process to
    * start with. Also, start a MiniDfsCluster before starting the hbase cluster.
    * The configuration used will be edited so that this works correctly.
+   * @param regionServers number of region servers to start.
    */  
   public HBaseClusterTestCase(int regionServers) {
     this(regionServers, true);
@@ -65,6 +67,8 @@
    * configured in hbase-site.xml and is already started, or you have started a
    * MiniDFSCluster on your own and edited the configuration in memory. (You 
    * can modify the config used by overriding the preHBaseClusterSetup method.)
+   * @param regionServers number of region servers to start.
+   * @param startDfs set to true if MiniDFS should be started
    */
   public HBaseClusterTestCase(int regionServers, boolean startDfs) {
     super();
@@ -81,9 +85,11 @@
   /**
    * Actually start the MiniHBase instance.
    */
+  @SuppressWarnings("unused")
   protected void HBaseClusterSetup() throws Exception {
     // start the mini cluster
     this.cluster = new MiniHBaseCluster(conf, regionServers);
+    // opening the META table ensures that cluster is running
     HTable meta = new HTable(conf, new Text(".META."));
   }
   

Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/HBaseTestCase.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/HBaseTestCase.java?rev=643761&r1=643760&r2=643761&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/HBaseTestCase.java (original)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/HBaseTestCase.java Tue Apr  1 23:58:26 2008
@@ -102,11 +102,8 @@
     localfs =
       (conf.get("fs.default.name", "file:///").compareTo("file::///") == 0);
 
-    try {
+    if (fs == null) {
       this.fs = FileSystem.get(conf);
-    } catch (IOException e) {
-      LOG.fatal("error getting file system", e);
-      throw e;
     }
     try {
       if (localfs) {
@@ -509,54 +506,88 @@
    */
   public static class HTableIncommon implements Incommon {
     final HTable table;
+    private BatchUpdate batch;
+    
+    private void checkBatch() {
+      if (batch == null) {
+        throw new IllegalStateException("No batch update in progress.");
+      }
+    }
+    
     /**
      * @param table
      */
     public HTableIncommon(final HTable table) {
       super();
       this.table = table;
+      this.batch = null;
     }
     /** {@inheritDoc} */
-    public void abort(long lockid) {
-      this.table.abort(lockid);
+    public void abort(@SuppressWarnings("unused") long lockid) {
+      if (this.batch != null) {
+        this.batch = null;
+      }
     }
     /** {@inheritDoc} */
-    public void commit(long lockid) throws IOException {
-      this.table.commit(lockid);
+    public void commit(@SuppressWarnings("unused") long lockid)
+    throws IOException {
+      checkBatch();
+      this.table.commit(batch);
+      this.batch = null;
     }
+
     /** {@inheritDoc} */
-    public void commit(long lockid, final long ts) throws IOException {
-      this.table.commit(lockid, ts);
+    public void commit(@SuppressWarnings("unused") long lockid, final long ts)
+    throws IOException {
+      checkBatch();
+      this.batch.setTimestamp(ts);
+      this.table.commit(batch);
+      this.batch = null;
     }
+
     /** {@inheritDoc} */
-    public void put(long lockid, Text column, byte[] val) {
-      this.table.put(lockid, column, val);
+    public void put(@SuppressWarnings("unused") long lockid, Text column,
+        byte[] val) {
+      checkBatch();
+      this.batch.put(column, val);
     }
+
     /** {@inheritDoc} */
-    public void delete(long lockid, Text column) {
-      this.table.delete(lockid, column);
+    public void delete(@SuppressWarnings("unused") long lockid, Text column) {
+      checkBatch();
+      this.batch.delete(column);
     }
+    
     /** {@inheritDoc} */
     public void deleteAll(Text row, Text column, long ts) throws IOException {
       this.table.deleteAll(row, column, ts);
     }
+    
     /** {@inheritDoc} */
     public long startUpdate(Text row) {
-      return this.table.startUpdate(row);
+      if (this.batch != null) {
+        throw new IllegalStateException("Batch update already in progress.");
+      }
+      this.batch = new BatchUpdate(row);
+      return 0L;
     }
+
     /** {@inheritDoc} */
     public HScannerInterface getScanner(Text [] columns, Text firstRow,
         long ts) throws IOException {
       return this.table.obtainScanner(columns, firstRow, ts, null);
     }
+    
     /** {@inheritDoc} */
     public Cell get(Text row, Text column) throws IOException {
       return this.table.get(row, column);
     }
+    
     /** {@inheritDoc} */
     public Cell[] get(Text row, Text column, int versions) throws IOException {
       return this.table.get(row, column, versions);
     }
+    
     /** {@inheritDoc} */
     public Cell[] get(Text row, Text column, long ts, int versions)
     throws IOException {
@@ -576,8 +607,10 @@
         fail(column.toString() + " at timestamp " + timestamp + 
           "\" was expected to be \"" + value + " but was null");
       }
-      assertEquals(column.toString() + " at timestamp " 
-        + timestamp, value, new String(cell_value.getValue()));
+      if (cell_value != null) {
+        assertEquals(column.toString() + " at timestamp " 
+            + timestamp, value, new String(cell_value.getValue()));
+      }
     }
   }
 }

Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/TestHBaseCluster.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/TestHBaseCluster.java?rev=643761&r1=643760&r2=643761&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/TestHBaseCluster.java (original)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/TestHBaseCluster.java Tue Apr  1 23:58:26 2008
@@ -24,6 +24,8 @@
 import java.util.Set;
 import java.util.TreeMap;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
@@ -33,6 +35,7 @@
  * Test HBase Master and Region servers, client API 
  */
 public class TestHBaseCluster extends HBaseClusterTestCase {
+  private static final Log LOG = LogFactory.getLog(TestHBaseCluster.class);
 
   private HTableDescriptor desc;
   private HBaseAdmin admin;
@@ -104,7 +107,7 @@
           (ANCHORSTR + k).getBytes(HConstants.UTF8_ENCODING));
       table.commit(b);
     }
-    System.out.println("Write " + NUM_VALS + " rows. Elapsed time: "
+    LOG.info("Write " + NUM_VALS + " rows. Elapsed time: "
         + ((System.currentTimeMillis() - startTime) / 1000.0));
 
     // Read them back in
@@ -134,7 +137,7 @@
           teststr.compareTo(bodystr) == 0);
     }
 
-    System.out.println("Read " + NUM_VALS + " rows. Elapsed time: "
+    LOG.info("Read " + NUM_VALS + " rows. Elapsed time: "
         + ((System.currentTimeMillis() - startTime) / 1000.0));
   }
   
@@ -175,7 +178,7 @@
             anchorFetched++;
             
           } else {
-            System.out.println(col);
+            LOG.info(col);
           }
         }
         curVals.clear();
@@ -184,7 +187,7 @@
       assertEquals("Expected " + NUM_VALS + " " + CONTENTS_BASIC + " values, but fetched " + contentsFetched, NUM_VALS, contentsFetched);
       assertEquals("Expected " + NUM_VALS + " " + ANCHORNUM + " values, but fetched " + anchorFetched, NUM_VALS, anchorFetched);
 
-      System.out.println("Scanned " + NUM_VALS
+      LOG.info("Scanned " + NUM_VALS
           + " rows. Elapsed time: "
           + ((System.currentTimeMillis() - startTime) / 1000.0));
 

Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/mapred/TestTableIndex.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/mapred/TestTableIndex.java?rev=643761&r1=643760&r2=643761&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/mapred/TestTableIndex.java (original)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/mapred/TestTableIndex.java Tue Apr  1 23:58:26 2008
@@ -32,6 +32,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.HColumnDescriptor;
@@ -42,7 +43,6 @@
 import org.apache.hadoop.hbase.HStoreKey;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.MultiRegionTable;
-import org.apache.hadoop.hbase.StaticTestEnvironment;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
@@ -76,9 +76,9 @@
 
 
   private HTableDescriptor desc;
+  private JobConf jobConf = null;
 
-  private Path dir;
-
+  /** default constructor */
   public TestTableIndex() {
     // Enable DEBUG-level MR logging.
     Logger.getLogger("org.apache.hadoop.mapred").setLevel(Level.DEBUG);
@@ -105,7 +105,6 @@
     // Create a table.
     HBaseAdmin admin = new HBaseAdmin(conf);
     admin.createTable(desc);
-
     // Populate a table into multiple regions
     makeMultiRegionTable(conf, cluster, dfsCluster.getFileSystem(), TABLE_NAME,
       INPUT_COLUMN);
@@ -116,6 +115,14 @@
     assertTrue(startKeys.length > 1);
   }
 
+  /** {@inheritDoc} */
+  @Override
+  public void tearDown() throws Exception {
+    if (jobConf != null) {
+      FileUtil.fullyDelete(new File(jobConf.get("hadoop.tmp.dir")));
+    }
+  }
+
   /**
    * Test HBase map/reduce
    * 
@@ -135,7 +142,7 @@
     conf.set("hbase.index.conf", createIndexConfContent());
 
     try {
-      JobConf jobConf = new JobConf(conf, TestTableIndex.class);
+      jobConf = new JobConf(conf, TestTableIndex.class);
       jobConf.setJobName("index column contents");
       jobConf.setNumMapTasks(2);
       // number of indexes to partition into

Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java?rev=643761&r1=643760&r2=643761&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java (original)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java Tue Apr  1 23:58:26 2008
@@ -20,13 +20,14 @@
 package org.apache.hadoop.hbase.mapred;
 
 import java.io.IOException;
+import java.io.File;
 import java.io.UnsupportedEncodingException;
 import java.util.Map;
 import java.util.TreeMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
@@ -35,7 +36,6 @@
 import org.apache.hadoop.hbase.HStoreKey;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.MultiRegionTable;
-import org.apache.hadoop.hbase.StaticTestEnvironment;
 import org.apache.hadoop.hbase.io.BatchUpdate;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.io.MapWritable;
@@ -66,8 +66,6 @@
     TEXT_OUTPUT_COLUMN
   };
 
-  private Path dir;
-  
   private static byte[][] values = null;
   
   static {
@@ -193,8 +191,9 @@
     @SuppressWarnings("deprecation")
     MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getUri().toString(), 1);
 
+    JobConf jobConf = null;
     try {
-      JobConf jobConf = new JobConf(conf, TestTableMapReduce.class);
+      jobConf = new JobConf(conf, TestTableMapReduce.class);
       jobConf.setJobName("process column contents");
       jobConf.setNumMapTasks(1);
       jobConf.setNumReduceTasks(1);
@@ -215,6 +214,9 @@
     verify(SINGLE_REGION_TABLE_NAME);
     } finally {
       mrCluster.shutdown();
+      if (jobConf != null) {
+        FileUtil.fullyDelete(new File(jobConf.get("hadoop.tmp.dir")));
+      }
     }
   }
   
@@ -244,8 +246,9 @@
     @SuppressWarnings("deprecation")
     MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getUri().toString(), 1);
 
+    JobConf jobConf = null;
     try {
-      JobConf jobConf = new JobConf(conf, TestTableMapReduce.class);
+      jobConf = new JobConf(conf, TestTableMapReduce.class);
       jobConf.setJobName("process column contents");
       jobConf.setNumMapTasks(2);
       jobConf.setNumReduceTasks(1);
@@ -262,6 +265,9 @@
       verify(MULTI_REGION_TABLE_NAME);
     } finally {
       mrCluster.shutdown();
+      if (jobConf != null) {
+        FileUtil.fullyDelete(new File(jobConf.get("hadoop.tmp.dir")));
+      }
     }
   }
 

Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestCompaction.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestCompaction.java?rev=643761&r1=643760&r2=643761&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestCompaction.java (original)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestCompaction.java Tue Apr  1 23:58:26 2008
@@ -93,7 +93,6 @@
    */
   public void testCompaction() throws Exception {
     createStoreFile(r);
-    assertFalse(r.compactIfNeeded());
     for (int i = 0; i < COMPACTION_THRESHOLD; i++) {
       createStoreFile(r);
     }
@@ -106,35 +105,8 @@
       r.get(STARTROW, COLUMN_FAMILY_TEXT, 100 /*Too many*/);
     // Assert that I can get > 5 versions (Should be at least 5 in there).
     assertTrue(cellValues.length >= 5);
-    // Try to run compaction concurrent with a thread flush just to see that
-    // we can.
-    final HRegion region = this.r;
-    Thread t1 = new Thread() {
-      @Override
-      public void run() {
-        try {
-          region.flushcache();
-        } catch (IOException e) {
-          e.printStackTrace();
-        }
-      }
-    };
-    Thread t2 = new Thread() {
-      @Override
-      public void run() {
-        try {
-          assertTrue(region.compactIfNeeded());
-        } catch (IOException e) {
-          e.printStackTrace();
-        }
-      }
-    };
-    t1.setDaemon(true);
-    t1.start();
-    t2.setDaemon(true);
-    t2.start();
-    t1.join();
-    t2.join();
+    r.flushcache();
+    r.compactStores();
     // Now assert that there are 4 versions of a record only: thats the
     // 3 versions that should be in the compacted store and then the one more
     // we added when we flushed. But could be 3 only if the flush happened
@@ -170,7 +142,8 @@
     // compacted store and the flush above when we added deletes.  Add more
     // content to be certain.
     createSmallerStoreFile(this.r);
-    assertTrue(r.compactIfNeeded());
+    r.flushcache();
+    r.compactStores();
     // Assert that the first row is still deleted.
     cellValues = r.get(STARTROW, COLUMN_FAMILY_TEXT, 100 /*Too many*/);
     assertNull(cellValues);

Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestHMemcache.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestHMemcache.java?rev=643761&r1=643760&r2=643761&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestHMemcache.java (original)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestHMemcache.java Tue Apr  1 23:58:26 2008
@@ -21,6 +21,7 @@
 
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
+import java.util.List;
 import java.util.Map;
 import java.util.SortedMap;
 import java.util.TreeMap;
@@ -44,6 +45,13 @@
   
   private static final String COLUMN_FAMILY = "column";
 
+  private static final int FIRST_ROW = 1;
+  private static final int NUM_VALS = 1000;
+  private static final Text CONTENTS_BASIC = new Text("contents:basic");
+  private static final String CONTENTSTR = "contentstr";
+  private static final String ANCHORNUM = "anchor:anchornum-";
+  private static final String ANCHORSTR = "anchorstr";
+
   /** {@inheritDoc} */
   @Override
   public void setUp() throws Exception {
@@ -51,6 +59,50 @@
     this.hmemcache = new Memcache();
   }
 
+  /**
+   * @throws UnsupportedEncodingException
+   */
+  public void testMemcache() throws UnsupportedEncodingException {
+    for (int k = FIRST_ROW; k <= NUM_VALS; k++) {
+      Text row = new Text("row_" + k);
+      HStoreKey key =
+        new HStoreKey(row, CONTENTS_BASIC, System.currentTimeMillis());
+      hmemcache.add(key, (CONTENTSTR + k).getBytes(HConstants.UTF8_ENCODING));
+      
+      key =
+        new HStoreKey(row, new Text(ANCHORNUM + k), System.currentTimeMillis());
+      hmemcache.add(key, (ANCHORSTR + k).getBytes(HConstants.UTF8_ENCODING));
+    }
+
+    // Read them back
+
+    for (int k = FIRST_ROW; k <= NUM_VALS; k++) {
+      List<Cell> results;
+      Text row = new Text("row_" + k);
+      HStoreKey key = new HStoreKey(row, CONTENTS_BASIC, Long.MAX_VALUE);
+      results = hmemcache.get(key, 1);
+      assertNotNull("no data for " + key.toString(), results);
+      assertEquals(1, results.size());
+      String bodystr = new String(results.get(0).getValue(),
+          HConstants.UTF8_ENCODING);
+      String teststr = CONTENTSTR + k;
+      assertTrue("Incorrect value for key: (" + key.toString() +
+          "), expected: '" + teststr + "' got: '" +
+          bodystr + "'", teststr.compareTo(bodystr) == 0);
+      
+      key = new HStoreKey(row, new Text(ANCHORNUM + k), Long.MAX_VALUE);
+      results = hmemcache.get(key, 1);
+      assertNotNull("no data for " + key.toString(), results);
+      assertEquals(1, results.size());
+      bodystr = new String(results.get(0).getValue(),
+          HConstants.UTF8_ENCODING);
+      teststr = ANCHORSTR + k;
+      assertTrue("Incorrect value for key: (" + key.toString() +
+          "), expected: '" + teststr + "' got: '" + bodystr + "'",
+          teststr.compareTo(bodystr) == 0);
+    }
+  }
+
   private Text getRowName(final int index) {
     return new Text("row" + Integer.toString(index));
   }
@@ -175,8 +227,8 @@
     }
   }
   
-  /** For HBASE-528 **/
-  public void testGetRowKeyAtOrBefore() throws IOException {
+  /** For HBASE-528 */
+  public void testGetRowKeyAtOrBefore() {
     // set up some test data
     Text t10 = new Text("010");
     Text t20 = new Text("020");



Mime
View raw message